Merge "Created new class KeypairSettingsError."
[snaps.git] / snaps / openstack / tests / create_image_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 try:
17     from urllib.request import URLError
18 except ImportError:
19     from urllib2 import URLError
20
21 import logging
22 import shutil
23 import unittest
24 import uuid
25
26 import os
27
28 from snaps import file_utils
29 from snaps.openstack import create_image
30 from snaps.openstack.create_image import (ImageSettings, ImageCreationError,
31                                           ImageSettingsError)
32 from snaps.openstack.tests import openstack_tests
33 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
34 from snaps.openstack.utils import glance_utils
35
36 __author__ = 'spisarski'
37
38 logger = logging.getLogger('create_image_tests')
39
40
41 class ImageSettingsUnitTests(unittest.TestCase):
42     """
43     Tests the construction of the ImageSettings class
44     """
45
46     def test_no_params(self):
47         with self.assertRaises(ImageSettingsError):
48             ImageSettings()
49
50     def test_empty_config(self):
51         with self.assertRaises(ImageSettingsError):
52             ImageSettings(**dict())
53
54     def test_name_only(self):
55         with self.assertRaises(ImageSettingsError):
56             ImageSettings(name='foo')
57
58     def test_config_with_name_only(self):
59         with self.assertRaises(ImageSettingsError):
60             ImageSettings(**{'name': 'foo'})
61
62     def test_name_user_only(self):
63         with self.assertRaises(ImageSettingsError):
64             ImageSettings(name='foo', image_user='bar')
65
66     def test_config_with_name_user_only(self):
67         with self.assertRaises(ImageSettingsError):
68             ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
69
70     def test_name_user_format_only(self):
71         with self.assertRaises(ImageSettingsError):
72             ImageSettings(name='foo', image_user='bar', img_format='qcow2')
73
74     def test_config_with_name_user_format_only(self):
75         with self.assertRaises(ImageSettingsError):
76             ImageSettings(
77                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
78
79     def test_name_user_format_url_file_only(self):
80         with self.assertRaises(ImageSettingsError):
81             ImageSettings(name='foo', image_user='bar', img_format='qcow2',
82                           url='http://foo.com',
83                           image_file='/foo/bar.qcow')
84
85     def test_config_with_name_user_format_url_file_only(self):
86         with self.assertRaises(ImageSettingsError):
87             ImageSettings(
88                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
89                    'download_url': 'http://foo.com',
90                    'image_file': '/foo/bar.qcow'})
91
92     def test_name_user_format_url_only(self):
93         settings = ImageSettings(name='foo', image_user='bar',
94                                  img_format='qcow2', url='http://foo.com')
95         self.assertEqual('foo', settings.name)
96         self.assertEqual('bar', settings.image_user)
97         self.assertEqual('qcow2', settings.format)
98         self.assertEqual('http://foo.com', settings.url)
99         self.assertIsNone(settings.image_file)
100         self.assertFalse(settings.exists)
101         self.assertFalse(settings.public)
102         self.assertIsNone(settings.nic_config_pb_loc)
103
104     def test_name_user_format_url_only_properties(self):
105         properties = {'hw_video_model': 'vga'}
106         settings = ImageSettings(name='foo', image_user='bar',
107                                  img_format='qcow2', url='http://foo.com',
108                                  extra_properties=properties)
109         self.assertEqual('foo', settings.name)
110         self.assertEqual('bar', settings.image_user)
111         self.assertEqual('qcow2', settings.format)
112         self.assertEqual('http://foo.com', settings.url)
113         self.assertEqual(properties, settings.extra_properties)
114         self.assertIsNone(settings.image_file)
115         self.assertFalse(settings.exists)
116         self.assertFalse(settings.public)
117         self.assertIsNone(settings.nic_config_pb_loc)
118
119     def test_config_with_name_user_format_url_only(self):
120         settings = ImageSettings(
121             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
122                'download_url': 'http://foo.com'})
123         self.assertEqual('foo', settings.name)
124         self.assertEqual('bar', settings.image_user)
125         self.assertEqual('qcow2', settings.format)
126         self.assertEqual('http://foo.com', settings.url)
127         self.assertIsNone(settings.image_file)
128         self.assertFalse(settings.exists)
129         self.assertFalse(settings.public)
130         self.assertIsNone(settings.nic_config_pb_loc)
131
132     def test_name_user_format_file_only(self):
133         settings = ImageSettings(name='foo', image_user='bar',
134                                  img_format='qcow2',
135                                  image_file='/foo/bar.qcow')
136         self.assertEqual('foo', settings.name)
137         self.assertEqual('bar', settings.image_user)
138         self.assertEqual('qcow2', settings.format)
139         self.assertIsNone(settings.url)
140         self.assertEqual('/foo/bar.qcow', settings.image_file)
141         self.assertFalse(settings.exists)
142         self.assertFalse(settings.public)
143         self.assertIsNone(settings.nic_config_pb_loc)
144
145     def test_config_with_name_user_format_file_only(self):
146         settings = ImageSettings(
147             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
148                'image_file': '/foo/bar.qcow'})
149         self.assertEqual('foo', settings.name)
150         self.assertEqual('bar', settings.image_user)
151         self.assertEqual('qcow2', settings.format)
152         self.assertIsNone(settings.url)
153         self.assertEqual('/foo/bar.qcow', settings.image_file)
154         self.assertFalse(settings.exists)
155         self.assertFalse(settings.public)
156         self.assertIsNone(settings.nic_config_pb_loc)
157
158     def test_all_url(self):
159         properties = {'hw_video_model': 'vga'}
160         kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
161                                         image_user='bar', img_format='qcow2')
162         ramdisk_settings = ImageSettings(name='ramdisk',
163                                          url='http://ramdisk.com',
164                                          image_user='bar', img_format='qcow2')
165         settings = ImageSettings(name='foo', image_user='bar',
166                                  img_format='qcow2', url='http://foo.com',
167                                  extra_properties=properties,
168                                  nic_config_pb_loc='/foo/bar',
169                                  kernel_image_settings=kernel_settings,
170                                  ramdisk_image_settings=ramdisk_settings,
171                                  exists=True, public=True)
172         self.assertEqual('foo', settings.name)
173         self.assertEqual('bar', settings.image_user)
174         self.assertEqual('qcow2', settings.format)
175         self.assertEqual('http://foo.com', settings.url)
176         self.assertEqual(properties, settings.extra_properties)
177         self.assertIsNone(settings.image_file)
178         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
179         self.assertEqual('kernel', settings.kernel_image_settings.name)
180         self.assertEqual('http://kernel.com',
181                          settings.kernel_image_settings.url)
182         self.assertEqual('bar', settings.kernel_image_settings.image_user)
183         self.assertEqual('qcow2', settings.kernel_image_settings.format)
184         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
185         self.assertEqual('http://ramdisk.com',
186                          settings.ramdisk_image_settings.url)
187         self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
188         self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
189         self.assertTrue(settings.exists)
190         self.assertTrue(settings.public)
191
192     def test_config_all_url(self):
193         settings = ImageSettings(
194             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
195                'download_url': 'http://foo.com',
196                'extra_properties': '{\'hw_video_model\': \'vga\'}',
197                'nic_config_pb_loc': '/foo/bar',
198                'kernel_image_settings': {
199                    'name': 'kernel',
200                    'download_url': 'http://kernel.com',
201                    'image_user': 'bar',
202                    'format': 'qcow2'},
203                'ramdisk_image_settings': {
204                    'name': 'ramdisk',
205                    'download_url': 'http://ramdisk.com',
206                    'image_user': 'bar',
207                    'format': 'qcow2'},
208                'exists': True, 'public': True})
209         self.assertEqual('foo', settings.name)
210         self.assertEqual('bar', settings.image_user)
211         self.assertEqual('qcow2', settings.format)
212         self.assertEqual('http://foo.com', settings.url)
213         self.assertEqual('{\'hw_video_model\': \'vga\'}',
214                          settings.extra_properties)
215         self.assertIsNone(settings.image_file)
216         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
217         self.assertEqual('kernel', settings.kernel_image_settings.name)
218         self.assertEqual('http://kernel.com',
219                          settings.kernel_image_settings.url)
220         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
221         self.assertEqual('http://ramdisk.com',
222                          settings.ramdisk_image_settings.url)
223         self.assertTrue(settings.exists)
224         self.assertTrue(settings.public)
225
226     def test_all_file(self):
227         properties = {'hw_video_model': 'vga'}
228         settings = ImageSettings(name='foo', image_user='bar',
229                                  img_format='qcow2',
230                                  image_file='/foo/bar.qcow',
231                                  extra_properties=properties,
232                                  nic_config_pb_loc='/foo/bar', exists=True,
233                                  public=True)
234         self.assertEqual('foo', settings.name)
235         self.assertEqual('bar', settings.image_user)
236         self.assertEqual('qcow2', settings.format)
237         self.assertIsNone(settings.url)
238         self.assertEqual('/foo/bar.qcow', settings.image_file)
239         self.assertEqual(properties, settings.extra_properties)
240         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
241         self.assertTrue(settings.exists)
242         self.assertTrue(settings.public)
243
244     def test_config_all_file(self):
245         settings = ImageSettings(
246             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
247                'image_file': '/foo/bar.qcow',
248                'extra_properties': '{\'hw_video_model\' : \'vga\'}',
249                'nic_config_pb_loc': '/foo/bar', 'exists': True,
250                'public': True})
251         self.assertEqual('foo', settings.name)
252         self.assertEqual('bar', settings.image_user)
253         self.assertEqual('qcow2', settings.format)
254         self.assertIsNone(settings.url)
255         self.assertEqual('/foo/bar.qcow', settings.image_file)
256         self.assertEqual('{\'hw_video_model\' : \'vga\'}',
257                          settings.extra_properties)
258         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
259         self.assertTrue(settings.exists)
260         self.assertTrue(settings.public)
261
262
263 class CreateImageSuccessTests(OSIntegrationTestCase):
264     """
265     Test for the CreateImage class defined in create_image.py
266     """
267
268     def setUp(self):
269         """
270         Instantiates the CreateImage object that is responsible for downloading
271         and creating an OS image file within OpenStack
272         """
273         super(self.__class__, self).__start__()
274
275         guid = uuid.uuid4()
276         self.image_name = self.__class__.__name__ + '-' + str(guid)
277         self.glance = glance_utils.glance_client(self.os_creds)
278         self.image_creator = None
279
280         if self.image_metadata and 'glance_tests' in self.image_metadata:
281             glance_test_meta = self.image_metadata['glance_tests']
282         else:
283             glance_test_meta = None
284
285         self.tmp_dir = 'tmp/' + str(guid)
286         if not os.path.exists(self.tmp_dir):
287             os.makedirs(self.tmp_dir)
288
289         self.image_settings = openstack_tests.cirros_image_settings(
290             name=self.image_name,
291             image_metadata=glance_test_meta)
292
293     def tearDown(self):
294         """
295         Cleans the image and downloaded image file
296         """
297         if self.image_creator:
298             self.image_creator.clean()
299
300         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
301             shutil.rmtree(self.tmp_dir)
302
303         super(self.__class__, self).__clean__()
304
305     def test_create_image_clean_url(self):
306         """
307         Tests the creation of an OpenStack image from a URL.
308         """
309         # Create Image
310         # Set the default image settings, then set any custom parameters sent
311         # from the app
312
313         self.image_creator = create_image.OpenStackImage(self.os_creds,
314                                                          self.image_settings)
315         created_image = self.image_creator.create()
316         self.assertIsNotNone(created_image)
317
318         retrieved_image = glance_utils.get_image(self.glance,
319                                                  self.image_settings.name)
320         self.assertIsNotNone(retrieved_image)
321         self.assertEqual(created_image.size, retrieved_image.size)
322         self.assertEqual(get_image_size(self.image_settings),
323                          retrieved_image.size)
324         self.assertEqual(created_image.name, retrieved_image.name)
325         self.assertEqual(created_image.id, retrieved_image.id)
326
327     def test_create_image_clean_url_properties(self):
328         """
329         Tests the creation of an OpenStack image from a URL and set properties.
330         """
331         # Create Image
332         # Set the default image settings, then set any custom parameters sent
333         # from the app
334         self.image_creator = create_image.OpenStackImage(self.os_creds,
335                                                          self.image_settings)
336         created_image = self.image_creator.create()
337         self.assertIsNotNone(created_image)
338
339         retrieved_image = glance_utils.get_image(self.glance,
340                                                  self.image_settings.name)
341         self.assertIsNotNone(retrieved_image)
342         self.assertEqual(self.image_creator.get_image().size,
343                          retrieved_image.size)
344         self.assertEqual(get_image_size(self.image_settings),
345                          retrieved_image.size)
346         self.assertEqual(created_image.name, retrieved_image.name)
347         self.assertEqual(created_image.id, retrieved_image.id)
348         self.assertEqual(created_image.properties, retrieved_image.properties)
349
350     def test_create_image_clean_file(self):
351         """
352         Tests the creation of an OpenStack image from a file.
353         """
354         if not self.image_settings.image_file and self.image_settings.url:
355             # Download the file of the image
356             image_file_name = file_utils.download(self.image_settings.url,
357                                                   self.tmp_dir).name
358         else:
359             image_file_name = self.image_settings.image_file
360
361         if image_file_name:
362             file_image_settings = openstack_tests.file_image_test_settings(
363                 name=self.image_name, file_path=image_file_name)
364
365             self.image_creator = create_image.OpenStackImage(
366                 self.os_creds, file_image_settings)
367             created_image = self.image_creator.create()
368             self.assertIsNotNone(created_image)
369             self.assertEqual(self.image_name, created_image.name)
370
371             retrieved_image = glance_utils.get_image(
372                 self.glance, file_image_settings.name)
373             self.assertIsNotNone(retrieved_image)
374             self.assertEqual(self.image_creator.get_image().size,
375                              retrieved_image.size)
376             self.assertEqual(get_image_size(file_image_settings),
377                              retrieved_image.size)
378
379             self.assertEqual(created_image.name, retrieved_image.name)
380             self.assertEqual(created_image.id, retrieved_image.id)
381         else:
382             logger.warn(
383                 'Test not executed as the image metadata requires image files')
384
385     def test_create_delete_image(self):
386         """
387         Tests the creation then deletion of an OpenStack image to ensure
388         clean() does not raise an Exception.
389         """
390         # Create Image
391         self.image_creator = create_image.OpenStackImage(self.os_creds,
392                                                          self.image_settings)
393         created_image = self.image_creator.create()
394         self.assertIsNotNone(created_image)
395
396         retrieved_image = glance_utils.get_image(self.glance,
397                                                  self.image_settings.name)
398         self.assertIsNotNone(retrieved_image)
399         self.assertEqual(self.image_creator.get_image().size,
400                          retrieved_image.size)
401         self.assertEqual(get_image_size(self.image_settings),
402                          retrieved_image.size)
403
404         # Delete Image manually
405         glance_utils.delete_image(self.glance, created_image)
406
407         self.assertIsNone(glance_utils.get_image(
408             self.glance, self.image_creator.image_settings.name))
409
410         # Must not throw an exception when attempting to cleanup non-existent
411         # image
412         self.image_creator.clean()
413         self.assertIsNone(self.image_creator.get_image())
414
415     def test_create_same_image(self):
416         """
417         Tests the creation of an OpenStack image when the image already exists.
418         """
419         # Create Image
420         self.image_creator = create_image.OpenStackImage(self.os_creds,
421                                                          self.image_settings)
422         image1 = self.image_creator.create()
423
424         retrieved_image = glance_utils.get_image(self.glance,
425                                                  self.image_settings.name)
426         self.assertIsNotNone(retrieved_image)
427         self.assertEqual(self.image_creator.get_image().size,
428                          retrieved_image.size)
429         self.assertEqual(get_image_size(self.image_settings),
430                          retrieved_image.size)
431         self.assertEqual(image1.name, retrieved_image.name)
432         self.assertEqual(image1.id, retrieved_image.id)
433         self.assertEqual(image1.properties, retrieved_image.properties)
434
435         # Should be retrieving the instance data
436         os_image_2 = create_image.OpenStackImage(self.os_creds,
437                                                  self.image_settings)
438         image2 = os_image_2.create()
439         self.assertEqual(image1.id, image2.id)
440
441     def test_create_same_image_new_settings(self):
442         """
443         Tests the creation of an OpenStack image when the image already exists
444         and the configuration only contains the name.
445         """
446         # Create Image
447         self.image_creator = create_image.OpenStackImage(self.os_creds,
448                                                          self.image_settings)
449         image1 = self.image_creator.create()
450
451         retrieved_image = glance_utils.get_image(self.glance,
452                                                  self.image_settings.name)
453         self.assertIsNotNone(retrieved_image)
454         self.assertEqual(self.image_creator.get_image().size,
455                          retrieved_image.size)
456         self.assertEqual(get_image_size(self.image_settings),
457                          retrieved_image.size)
458         self.assertEqual(image1.name, retrieved_image.name)
459         self.assertEqual(image1.id, retrieved_image.id)
460         self.assertEqual(image1.properties, retrieved_image.properties)
461
462         # Should be retrieving the instance data
463         image_2_settings = ImageSettings(name=self.image_settings.name,
464                                          image_user='foo', exists=True)
465         os_image_2 = create_image.OpenStackImage(self.os_creds,
466                                                  image_2_settings)
467         image2 = os_image_2.create()
468         self.assertEqual(image1.id, image2.id)
469
470
471 class CreateImageNegativeTests(OSIntegrationTestCase):
472     """
473     Negative test cases for the CreateImage class
474     """
475
476     def setUp(self):
477         super(self.__class__, self).__start__()
478
479         self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
480         self.image_creator = None
481
482     def tearDown(self):
483         if self.image_creator:
484             self.image_creator.clean()
485
486         super(self.__class__, self).__clean__()
487
488     def test_bad_image_name(self):
489         """
490         Expect an ImageCreationError when the image name does not exist when a
491         file or URL has not been configured
492         """
493         os_image_settings = ImageSettings(name='foo', image_user='bar',
494                                           exists=True)
495         self.image_creator = create_image.OpenStackImage(self.os_creds,
496                                                          os_image_settings)
497
498         with self.assertRaises(ImageCreationError):
499             self.image_creator.create()
500
501             self.fail('ImageCreationError should have been raised prior to'
502                       'this line')
503
504     def test_bad_image_url(self):
505         """
506         Expect an ImageCreationError when the image download url is bad
507         """
508         os_image_settings = openstack_tests.cirros_image_settings(
509             name=self.image_name)
510         self.image_creator = create_image.OpenStackImage(
511             self.os_creds,
512             create_image.ImageSettings(name=os_image_settings.name,
513                                        image_user=os_image_settings.image_user,
514                                        img_format=os_image_settings.format,
515                                        url="http://foo.bar"))
516
517         with self.assertRaises(URLError):
518             self.image_creator.create()
519
520     def test_bad_image_file(self):
521         """
522         Expect an ImageCreationError when the image file does not exist
523         """
524         os_image_settings = openstack_tests.cirros_image_settings(
525             name=self.image_name)
526         self.image_creator = create_image.OpenStackImage(
527             self.os_creds,
528             create_image.ImageSettings(name=os_image_settings.name,
529                                        image_user=os_image_settings.image_user,
530                                        img_format=os_image_settings.format,
531                                        image_file="/foo/bar.qcow"))
532         with self.assertRaises(IOError):
533             self.image_creator.create()
534
535
536 class CreateMultiPartImageTests(OSIntegrationTestCase):
537     """
538     Test different means for creating a 3-part images
539     """
540
541     def setUp(self):
542         """
543         Instantiates the CreateImage object that is responsible for
544         downloading and creating an OS image file within OpenStack
545         """
546         super(self.__class__, self).__start__()
547
548         guid = uuid.uuid4()
549         self.image_creators = list()
550         self.image_name = self.__class__.__name__ + '-' + str(guid)
551         self.glance = glance_utils.glance_client(self.os_creds)
552
553         self.tmp_dir = 'tmp/' + str(guid)
554         if not os.path.exists(self.tmp_dir):
555             os.makedirs(self.tmp_dir)
556
557         if self.image_metadata and 'glance_tests' in self.image_metadata:
558             self.glance_test_meta = self.image_metadata['glance_tests']
559         else:
560             self.glance_test_meta = dict()
561
562     def tearDown(self):
563         """
564         Cleans the images and downloaded image file
565         """
566         for image_creator in self.image_creators:
567             image_creator.clean()
568
569         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
570             shutil.rmtree(self.tmp_dir)
571
572         super(self.__class__, self).__clean__()
573
574     def test_create_three_part_image_from_url(self):
575         """
576         Tests the creation of a 3-part OpenStack image from a URL.
577         """
578         # Create the kernel image
579         if 'disk_file' not in self.glance_test_meta:
580             image_settings = openstack_tests.cirros_image_settings(
581                 name=self.image_name,
582                 image_metadata={
583                     'disk_url':
584                         openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
585                     'kernel_url':
586                         openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
587                     'ramdisk_url':
588                         openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
589
590             image_creator = create_image.OpenStackImage(self.os_creds,
591                                                         image_settings)
592             self.image_creators.append(image_creator)
593             image_creator.create()
594
595             main_image = glance_utils.get_image(self.glance,
596                                                 image_settings.name)
597             self.assertIsNotNone(main_image)
598             self.assertIsNotNone(image_creator.get_image())
599             self.assertEqual(image_creator.get_image().id, main_image.id)
600
601             kernel_image = glance_utils.get_image(
602                 self.glance, image_settings.kernel_image_settings.name)
603             self.assertIsNotNone(kernel_image)
604             self.assertIsNotNone(image_creator.get_kernel_image())
605             self.assertEqual(kernel_image.id,
606                              image_creator.get_kernel_image().id)
607
608             ramdisk_image = glance_utils.get_image(
609                 self.glance, image_settings.ramdisk_image_settings.name)
610             self.assertIsNotNone(ramdisk_image)
611             self.assertIsNotNone(image_creator.get_ramdisk_image())
612             self.assertEqual(ramdisk_image.id,
613                              image_creator.get_ramdisk_image().id)
614         else:
615             logger.warn(
616                 'Test not executed as the image metadata requires image files')
617
618     def test_create_three_part_image_from_file_3_creators(self):
619         """
620         Tests the creation of a 3-part OpenStack image from files.
621         """
622         file_only = False
623
624         # Set properties
625         properties = {}
626         if self.glance_test_meta:
627             if 'extra_properties' in self.glance_test_meta:
628                 properties = self.glance_test_meta['extra_properties']
629             if 'disk_file' in self.glance_test_meta:
630                 file_only = True
631
632         # Create the kernel image
633         kernel_file_name = None
634         kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
635         if 'kernel_file' in self.glance_test_meta:
636             kernel_file_name = self.glance_test_meta['kernel_file']
637         elif 'kernel_url' in self.glance_test_meta:
638             kernel_url = self.glance_test_meta['kernel_url']
639         else:
640             kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
641
642         if not kernel_file_name and not file_only:
643             kernel_file_name = file_utils.download(kernel_url,
644                                                    self.tmp_dir).name
645         else:
646             logger.warn('Will not download the kernel image.'
647                         ' Cannot execute test')
648             return
649
650         kernel_file_image_settings = openstack_tests.file_image_test_settings(
651             name=self.image_name + '_kernel', file_path=kernel_file_name)
652
653         self.image_creators.append(create_image.OpenStackImage(
654             self.os_creds, kernel_file_image_settings))
655         kernel_image = self.image_creators[-1].create()
656         self.assertIsNotNone(kernel_image)
657         self.assertEqual(get_image_size(kernel_file_image_settings),
658                          kernel_image.size)
659
660         # Create the ramdisk image
661         ramdisk_file_name = None
662         ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
663         if 'ramdisk_file' in self.glance_test_meta:
664             ramdisk_file_name = self.glance_test_meta['ramdisk_file']
665         elif 'ramdisk_url' in self.glance_test_meta:
666             ramdisk_url = self.glance_test_meta['ramdisk_url']
667
668         if not ramdisk_file_name and not file_only:
669             ramdisk_file_name = file_utils.download(ramdisk_url,
670                                                     self.tmp_dir).name
671         else:
672             logger.warn('Will not download the ramdisk image.'
673                         ' Cannot execute test')
674             return
675
676         ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
677             name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
678         self.image_creators.append(create_image.OpenStackImage(
679             self.os_creds, ramdisk_file_image_settings))
680         ramdisk_image = self.image_creators[-1].create()
681         self.assertIsNotNone(ramdisk_image)
682         self.assertEqual(get_image_size(ramdisk_file_image_settings),
683                          ramdisk_image.size)
684
685         # Create the main disk image
686         disk_file_name = None
687         disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
688         if 'disk_file' in self.glance_test_meta:
689             disk_file_name = self.glance_test_meta['disk_file']
690         elif 'disk_url' in self.glance_test_meta:
691             disk_url = self.glance_test_meta['disk_url']
692
693         if not disk_file_name and not file_only:
694             disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
695         else:
696             logger.warn('Will not download the disk file image.'
697                         ' Cannot execute test')
698             return
699
700         file_image_settings = openstack_tests.file_image_test_settings(
701             name=self.image_name, file_path=disk_file_name)
702         properties['kernel_id'] = kernel_image.id
703         properties['ramdisk_id'] = ramdisk_image.id
704         file_image_settings.extra_properties = properties
705         self.image_creators.append(
706             create_image.OpenStackImage(self.os_creds, file_image_settings))
707         created_image = self.image_creators[-1].create()
708
709         self.assertIsNotNone(created_image)
710         self.assertEqual(self.image_name, created_image.name)
711
712         retrieved_image = glance_utils.get_image(self.glance,
713                                                  file_image_settings.name)
714         self.assertIsNotNone(retrieved_image)
715         self.assertEqual(self.image_creators[-1].get_image().size,
716                          retrieved_image.size)
717         self.assertEqual(get_image_size(file_image_settings),
718                          retrieved_image.size)
719         self.assertEqual(created_image.name, retrieved_image.name)
720         self.assertEqual(created_image.id, retrieved_image.id)
721         self.assertEqual(created_image.properties, retrieved_image.properties)
722
723     def test_create_three_part_image_from_url_3_creators(self):
724         """
725         Tests the creation of a 3-part OpenStack image from a URL.
726         """
727         if 'disk_file' not in self.glance_test_meta:
728             # Set properties
729             properties = {}
730             if self.glance_test_meta and \
731                     'extra_properties' in self.glance_test_meta:
732                 properties = self.glance_test_meta['extra_properties']
733
734             # Create the kernel image
735             kernel_image_settings = openstack_tests.cirros_image_settings(
736                 name=self.image_name + '_kernel',
737                 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
738
739             if self.glance_test_meta:
740                 if 'kernel_url' in self.glance_test_meta:
741                     kernel_image_settings.url = self.glance_test_meta[
742                         'kernel_url']
743             self.image_creators.append(
744                 create_image.OpenStackImage(self.os_creds,
745                                             kernel_image_settings))
746             kernel_image = self.image_creators[-1].create()
747             self.assertIsNotNone(kernel_image)
748             self.assertEqual(get_image_size(kernel_image_settings),
749                              kernel_image.size)
750
751             # Create the ramdisk image
752             ramdisk_image_settings = openstack_tests.cirros_image_settings(
753                 name=self.image_name + '_ramdisk',
754                 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
755             if self.glance_test_meta:
756                 if 'ramdisk_url' in self.glance_test_meta:
757                     ramdisk_image_settings.url = self.glance_test_meta[
758                         'ramdisk_url']
759             self.image_creators.append(
760                 create_image.OpenStackImage(self.os_creds,
761                                             ramdisk_image_settings))
762             ramdisk_image = self.image_creators[-1].create()
763             self.assertIsNotNone(ramdisk_image)
764             self.assertEqual(get_image_size(ramdisk_image_settings),
765                              ramdisk_image.size)
766
767             # Create the main image
768             os_image_settings = openstack_tests.cirros_image_settings(
769                 name=self.image_name,
770                 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
771             if self.glance_test_meta:
772                 if 'disk_url' in self.glance_test_meta:
773                     os_image_settings.url = self.glance_test_meta['disk_url']
774
775             properties['kernel_id'] = kernel_image.id
776             properties['ramdisk_id'] = ramdisk_image.id
777             os_image_settings.extra_properties = properties
778
779             self.image_creators.append(
780                 create_image.OpenStackImage(self.os_creds, os_image_settings))
781             created_image = self.image_creators[-1].create()
782             self.assertIsNotNone(created_image)
783             self.assertEqual(self.image_name, created_image.name)
784
785             retrieved_image = glance_utils.get_image(self.glance,
786                                                      os_image_settings.name)
787             self.assertIsNotNone(retrieved_image)
788
789             self.assertEqual(self.image_creators[-1].get_image().size,
790                              retrieved_image.size)
791             self.assertEqual(get_image_size(os_image_settings),
792                              retrieved_image.size)
793
794             self.assertEqual(created_image.name, retrieved_image.name)
795             self.assertEqual(created_image.id, retrieved_image.id)
796             self.assertEqual(created_image.properties,
797                              retrieved_image.properties)
798         else:
799             logger.warn(
800                 'Test not executed as the image metadata requires image files')
801
802
803 def get_image_size(image_settings):
804     """
805     Returns the expected image size
806     :return:
807     """
808     if image_settings.image_file:
809         return os.path.getsize(image_settings.image_file)
810     elif image_settings.url:
811         return int(file_utils.get_content_length(image_settings.url))
812     else:
813         raise Exception(
814             'Cannot retrieve expected image size. Image filename or URL has '
815             'not been configured')