Added image_settings parameter to get_image().
[snaps.git] / snaps / openstack / tests / create_image_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from glanceclient.exc import HTTPBadRequest
16
17 try:
18     from urllib.request import URLError
19 except ImportError:
20     from urllib2 import URLError
21
22 import logging
23 import shutil
24 import unittest
25 import uuid
26
27 import os
28
29 from snaps import file_utils
30 from snaps.openstack import create_image
31 from snaps.openstack.create_image import (ImageSettings, ImageCreationError,
32                                           ImageSettingsError)
33 from snaps.openstack.tests import openstack_tests
34 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
35 from snaps.openstack.utils import glance_utils
36
37 __author__ = 'spisarski'
38
39 logger = logging.getLogger('create_image_tests')
40
41
42 class ImageSettingsUnitTests(unittest.TestCase):
43     """
44     Tests the construction of the ImageSettings class
45     """
46
47     def test_no_params(self):
48         with self.assertRaises(ImageSettingsError):
49             ImageSettings()
50
51     def test_empty_config(self):
52         with self.assertRaises(ImageSettingsError):
53             ImageSettings(**dict())
54
55     def test_name_only(self):
56         with self.assertRaises(ImageSettingsError):
57             ImageSettings(name='foo')
58
59     def test_config_with_name_only(self):
60         with self.assertRaises(ImageSettingsError):
61             ImageSettings(**{'name': 'foo'})
62
63     def test_name_user_only(self):
64         with self.assertRaises(ImageSettingsError):
65             ImageSettings(name='foo', image_user='bar')
66
67     def test_config_with_name_user_only(self):
68         with self.assertRaises(ImageSettingsError):
69             ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
70
71     def test_name_user_format_only(self):
72         with self.assertRaises(ImageSettingsError):
73             ImageSettings(name='foo', image_user='bar', img_format='qcow2')
74
75     def test_config_with_name_user_format_only(self):
76         with self.assertRaises(ImageSettingsError):
77             ImageSettings(
78                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
79
80     def test_name_user_format_url_file_only(self):
81         with self.assertRaises(ImageSettingsError):
82             ImageSettings(name='foo', image_user='bar', img_format='qcow2',
83                           url='http://foo.com',
84                           image_file='/foo/bar.qcow')
85
86     def test_config_with_name_user_format_url_file_only(self):
87         with self.assertRaises(ImageSettingsError):
88             ImageSettings(
89                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
90                    'download_url': 'http://foo.com',
91                    'image_file': '/foo/bar.qcow'})
92
93     def test_name_user_format_url_only(self):
94         settings = ImageSettings(name='foo', image_user='bar',
95                                  img_format='qcow2', url='http://foo.com')
96         self.assertEqual('foo', settings.name)
97         self.assertEqual('bar', settings.image_user)
98         self.assertEqual('qcow2', settings.format)
99         self.assertEqual('http://foo.com', settings.url)
100         self.assertIsNone(settings.image_file)
101         self.assertFalse(settings.exists)
102         self.assertFalse(settings.public)
103         self.assertIsNone(settings.nic_config_pb_loc)
104
105     def test_name_user_format_url_only_properties(self):
106         properties = {'hw_video_model': 'vga'}
107         settings = ImageSettings(name='foo', image_user='bar',
108                                  img_format='qcow2', url='http://foo.com',
109                                  extra_properties=properties)
110         self.assertEqual('foo', settings.name)
111         self.assertEqual('bar', settings.image_user)
112         self.assertEqual('qcow2', settings.format)
113         self.assertEqual('http://foo.com', settings.url)
114         self.assertEqual(properties, settings.extra_properties)
115         self.assertIsNone(settings.image_file)
116         self.assertFalse(settings.exists)
117         self.assertFalse(settings.public)
118         self.assertIsNone(settings.nic_config_pb_loc)
119
120     def test_config_with_name_user_format_url_only(self):
121         settings = ImageSettings(
122             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
123                'download_url': 'http://foo.com'})
124         self.assertEqual('foo', settings.name)
125         self.assertEqual('bar', settings.image_user)
126         self.assertEqual('qcow2', settings.format)
127         self.assertEqual('http://foo.com', settings.url)
128         self.assertIsNone(settings.image_file)
129         self.assertFalse(settings.exists)
130         self.assertFalse(settings.public)
131         self.assertIsNone(settings.nic_config_pb_loc)
132
133     def test_name_user_format_file_only(self):
134         settings = ImageSettings(name='foo', image_user='bar',
135                                  img_format='qcow2',
136                                  image_file='/foo/bar.qcow')
137         self.assertEqual('foo', settings.name)
138         self.assertEqual('bar', settings.image_user)
139         self.assertEqual('qcow2', settings.format)
140         self.assertIsNone(settings.url)
141         self.assertEqual('/foo/bar.qcow', settings.image_file)
142         self.assertFalse(settings.exists)
143         self.assertFalse(settings.public)
144         self.assertIsNone(settings.nic_config_pb_loc)
145
146     def test_config_with_name_user_format_file_only(self):
147         settings = ImageSettings(
148             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
149                'image_file': '/foo/bar.qcow'})
150         self.assertEqual('foo', settings.name)
151         self.assertEqual('bar', settings.image_user)
152         self.assertEqual('qcow2', settings.format)
153         self.assertIsNone(settings.url)
154         self.assertEqual('/foo/bar.qcow', settings.image_file)
155         self.assertFalse(settings.exists)
156         self.assertFalse(settings.public)
157         self.assertIsNone(settings.nic_config_pb_loc)
158
159     def test_all_url(self):
160         properties = {'hw_video_model': 'vga'}
161         kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
162                                         image_user='bar', img_format='qcow2')
163         ramdisk_settings = ImageSettings(name='ramdisk',
164                                          url='http://ramdisk.com',
165                                          image_user='bar', img_format='qcow2')
166         settings = ImageSettings(name='foo', image_user='bar',
167                                  img_format='qcow2', url='http://foo.com',
168                                  extra_properties=properties,
169                                  nic_config_pb_loc='/foo/bar',
170                                  kernel_image_settings=kernel_settings,
171                                  ramdisk_image_settings=ramdisk_settings,
172                                  exists=True, public=True)
173         self.assertEqual('foo', settings.name)
174         self.assertEqual('bar', settings.image_user)
175         self.assertEqual('qcow2', settings.format)
176         self.assertEqual('http://foo.com', settings.url)
177         self.assertEqual(properties, settings.extra_properties)
178         self.assertIsNone(settings.image_file)
179         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
180         self.assertEqual('kernel', settings.kernel_image_settings.name)
181         self.assertEqual('http://kernel.com',
182                          settings.kernel_image_settings.url)
183         self.assertEqual('bar', settings.kernel_image_settings.image_user)
184         self.assertEqual('qcow2', settings.kernel_image_settings.format)
185         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
186         self.assertEqual('http://ramdisk.com',
187                          settings.ramdisk_image_settings.url)
188         self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
189         self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
190         self.assertTrue(settings.exists)
191         self.assertTrue(settings.public)
192
193     def test_config_all_url(self):
194         settings = ImageSettings(
195             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
196                'download_url': 'http://foo.com',
197                'extra_properties': '{\'hw_video_model\': \'vga\'}',
198                'nic_config_pb_loc': '/foo/bar',
199                'kernel_image_settings': {
200                    'name': 'kernel',
201                    'download_url': 'http://kernel.com',
202                    'image_user': 'bar',
203                    'format': 'qcow2'},
204                'ramdisk_image_settings': {
205                    'name': 'ramdisk',
206                    'download_url': 'http://ramdisk.com',
207                    'image_user': 'bar',
208                    'format': 'qcow2'},
209                'exists': True, 'public': True})
210         self.assertEqual('foo', settings.name)
211         self.assertEqual('bar', settings.image_user)
212         self.assertEqual('qcow2', settings.format)
213         self.assertEqual('http://foo.com', settings.url)
214         self.assertEqual('{\'hw_video_model\': \'vga\'}',
215                          settings.extra_properties)
216         self.assertIsNone(settings.image_file)
217         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
218         self.assertEqual('kernel', settings.kernel_image_settings.name)
219         self.assertEqual('http://kernel.com',
220                          settings.kernel_image_settings.url)
221         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
222         self.assertEqual('http://ramdisk.com',
223                          settings.ramdisk_image_settings.url)
224         self.assertTrue(settings.exists)
225         self.assertTrue(settings.public)
226
227     def test_all_file(self):
228         properties = {'hw_video_model': 'vga'}
229         settings = ImageSettings(name='foo', image_user='bar',
230                                  img_format='qcow2',
231                                  image_file='/foo/bar.qcow',
232                                  extra_properties=properties,
233                                  nic_config_pb_loc='/foo/bar', exists=True,
234                                  public=True)
235         self.assertEqual('foo', settings.name)
236         self.assertEqual('bar', settings.image_user)
237         self.assertEqual('qcow2', settings.format)
238         self.assertIsNone(settings.url)
239         self.assertEqual('/foo/bar.qcow', settings.image_file)
240         self.assertEqual(properties, settings.extra_properties)
241         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
242         self.assertTrue(settings.exists)
243         self.assertTrue(settings.public)
244
245     def test_config_all_file(self):
246         settings = ImageSettings(
247             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
248                'image_file': '/foo/bar.qcow',
249                'extra_properties': '{\'hw_video_model\' : \'vga\'}',
250                'nic_config_pb_loc': '/foo/bar', 'exists': True,
251                'public': True})
252         self.assertEqual('foo', settings.name)
253         self.assertEqual('bar', settings.image_user)
254         self.assertEqual('qcow2', settings.format)
255         self.assertIsNone(settings.url)
256         self.assertEqual('/foo/bar.qcow', settings.image_file)
257         self.assertEqual('{\'hw_video_model\' : \'vga\'}',
258                          settings.extra_properties)
259         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
260         self.assertTrue(settings.exists)
261         self.assertTrue(settings.public)
262
263
264 class CreateImageSuccessTests(OSIntegrationTestCase):
265     """
266     Test for the CreateImage class defined in create_image.py
267     """
268
269     def setUp(self):
270         """
271         Instantiates the CreateImage object that is responsible for downloading
272         and creating an OS image file within OpenStack
273         """
274         super(self.__class__, self).__start__()
275
276         guid = uuid.uuid4()
277         self.image_name = self.__class__.__name__ + '-' + str(guid)
278         self.glance = glance_utils.glance_client(self.os_creds)
279         self.image_creator = None
280
281         if self.image_metadata and 'glance_tests' in self.image_metadata:
282             glance_test_meta = self.image_metadata['glance_tests']
283         else:
284             glance_test_meta = None
285
286         self.tmp_dir = 'tmp/' + str(guid)
287         if not os.path.exists(self.tmp_dir):
288             os.makedirs(self.tmp_dir)
289
290         self.image_settings = openstack_tests.cirros_image_settings(
291             name=self.image_name,
292             image_metadata=glance_test_meta)
293
294     def tearDown(self):
295         """
296         Cleans the image and downloaded image file
297         """
298         if self.image_creator:
299             self.image_creator.clean()
300
301         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
302             shutil.rmtree(self.tmp_dir)
303
304         super(self.__class__, self).__clean__()
305
306     def test_create_image_clean_url(self):
307         """
308         Tests the creation of an OpenStack image from a URL.
309         """
310         # Create Image
311         # Set the default image settings, then set any custom parameters sent
312         # from the app
313
314         self.image_creator = create_image.OpenStackImage(self.os_creds,
315                                                          self.image_settings)
316         created_image = self.image_creator.create()
317         self.assertIsNotNone(created_image)
318
319         retrieved_image = glance_utils.get_image(
320             self.glance, image_settings=self.image_settings)
321         self.assertIsNotNone(retrieved_image)
322         self.assertEqual(created_image.size, retrieved_image.size)
323         self.assertEqual(get_image_size(self.image_settings),
324                          retrieved_image.size)
325         self.assertEqual(created_image.name, retrieved_image.name)
326         self.assertEqual(created_image.id, retrieved_image.id)
327
328     def test_create_image_clean_url_properties(self):
329         """
330         Tests the creation of an OpenStack image from a URL and set properties.
331         """
332         # Create Image
333         # Set the default image settings, then set any custom parameters sent
334         # from the app
335         self.image_creator = create_image.OpenStackImage(self.os_creds,
336                                                          self.image_settings)
337         created_image = self.image_creator.create()
338         self.assertIsNotNone(created_image)
339
340         retrieved_image = glance_utils.get_image(
341             self.glance, image_settings=self.image_settings)
342         self.assertIsNotNone(retrieved_image)
343         self.assertEqual(self.image_creator.get_image().size,
344                          retrieved_image.size)
345         self.assertEqual(get_image_size(self.image_settings),
346                          retrieved_image.size)
347         self.assertEqual(created_image.name, retrieved_image.name)
348         self.assertEqual(created_image.id, retrieved_image.id)
349         self.assertEqual(created_image.properties, retrieved_image.properties)
350
351     def test_create_image_clean_file(self):
352         """
353         Tests the creation of an OpenStack image from a file.
354         """
355         if not self.image_settings.image_file and self.image_settings.url:
356             # Download the file of the image
357             image_file_name = file_utils.download(self.image_settings.url,
358                                                   self.tmp_dir).name
359         else:
360             image_file_name = self.image_settings.image_file
361
362         if image_file_name:
363             file_image_settings = openstack_tests.file_image_test_settings(
364                 name=self.image_name, file_path=image_file_name)
365
366             self.image_creator = create_image.OpenStackImage(
367                 self.os_creds, file_image_settings)
368             created_image = self.image_creator.create()
369             self.assertIsNotNone(created_image)
370             self.assertEqual(self.image_name, created_image.name)
371
372             retrieved_image = glance_utils.get_image(
373                 self.glance, image_settings=file_image_settings)
374             self.assertIsNotNone(retrieved_image)
375             self.assertEqual(self.image_creator.get_image().size,
376                              retrieved_image.size)
377             self.assertEqual(get_image_size(file_image_settings),
378                              retrieved_image.size)
379
380             self.assertEqual(created_image.name, retrieved_image.name)
381             self.assertEqual(created_image.id, retrieved_image.id)
382         else:
383             logger.warn(
384                 'Test not executed as the image metadata requires image files')
385
386     def test_create_delete_image(self):
387         """
388         Tests the creation then deletion of an OpenStack image to ensure
389         clean() does not raise an Exception.
390         """
391         # Create Image
392         self.image_creator = create_image.OpenStackImage(self.os_creds,
393                                                          self.image_settings)
394         created_image = self.image_creator.create()
395         self.assertIsNotNone(created_image)
396
397         retrieved_image = glance_utils.get_image(
398             self.glance, image_settings=self.image_settings)
399         self.assertIsNotNone(retrieved_image)
400         self.assertEqual(self.image_creator.get_image().size,
401                          retrieved_image.size)
402         self.assertEqual(get_image_size(self.image_settings),
403                          retrieved_image.size)
404
405         # Delete Image manually
406         glance_utils.delete_image(self.glance, created_image)
407
408         self.assertIsNone(glance_utils.get_image(
409             self.glance, image_settings=self.image_creator.image_settings))
410
411         # Must not throw an exception when attempting to cleanup non-existent
412         # image
413         self.image_creator.clean()
414         self.assertIsNone(self.image_creator.get_image())
415
416     def test_create_same_image(self):
417         """
418         Tests the creation of an OpenStack image when the image already exists.
419         """
420         # Create Image
421         self.image_creator = create_image.OpenStackImage(self.os_creds,
422                                                          self.image_settings)
423         image1 = self.image_creator.create()
424
425         retrieved_image = glance_utils.get_image(
426             self.glance, image_settings=self.image_settings)
427         self.assertIsNotNone(retrieved_image)
428         self.assertEqual(self.image_creator.get_image().size,
429                          retrieved_image.size)
430         self.assertEqual(get_image_size(self.image_settings),
431                          retrieved_image.size)
432         self.assertEqual(image1.name, retrieved_image.name)
433         self.assertEqual(image1.id, retrieved_image.id)
434         self.assertEqual(image1.properties, retrieved_image.properties)
435
436         # Should be retrieving the instance data
437         os_image_2 = create_image.OpenStackImage(self.os_creds,
438                                                  self.image_settings)
439         image2 = os_image_2.create()
440         self.assertEqual(image1.id, image2.id)
441
442     def test_create_same_image_new_settings(self):
443         """
444         Tests the creation of an OpenStack image when the image already exists
445         and the configuration only contains the name.
446         """
447         # Create Image
448         self.image_creator = create_image.OpenStackImage(self.os_creds,
449                                                          self.image_settings)
450         image1 = self.image_creator.create()
451
452         retrieved_image = glance_utils.get_image(
453             self.glance, image_settings=self.image_settings)
454         self.assertIsNotNone(retrieved_image)
455         self.assertEqual(self.image_creator.get_image().size,
456                          retrieved_image.size)
457         self.assertEqual(get_image_size(self.image_settings),
458                          retrieved_image.size)
459         self.assertEqual(image1.name, retrieved_image.name)
460         self.assertEqual(image1.id, retrieved_image.id)
461         self.assertEqual(image1.properties, retrieved_image.properties)
462
463         # Should be retrieving the instance data
464         image_2_settings = ImageSettings(name=self.image_settings.name,
465                                          image_user='foo', exists=True)
466         os_image_2 = create_image.OpenStackImage(self.os_creds,
467                                                  image_2_settings)
468         image2 = os_image_2.create()
469         self.assertEqual(image1.id, image2.id)
470
471
472 class CreateImageNegativeTests(OSIntegrationTestCase):
473     """
474     Negative test cases for the CreateImage class
475     """
476
477     def setUp(self):
478         super(self.__class__, self).__start__()
479
480         self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
481         self.image_creator = None
482
483     def tearDown(self):
484         if self.image_creator:
485             self.image_creator.clean()
486
487         super(self.__class__, self).__clean__()
488
489     def test_bad_image_name(self):
490         """
491         Expect an ImageCreationError when the image name does not exist when a
492         file or URL has not been configured
493         """
494         os_image_settings = ImageSettings(name='foo', image_user='bar',
495                                           exists=True)
496         self.image_creator = create_image.OpenStackImage(self.os_creds,
497                                                          os_image_settings)
498
499         with self.assertRaises(ImageCreationError):
500             self.image_creator.create()
501
502             self.fail('ImageCreationError should have been raised prior to'
503                       'this line')
504
505     def test_bad_image_url(self):
506         """
507         Expect an ImageCreationError when the image download url is bad
508         """
509         os_image_settings = openstack_tests.cirros_image_settings(
510             name=self.image_name)
511         self.image_creator = create_image.OpenStackImage(
512             self.os_creds,
513             create_image.ImageSettings(name=os_image_settings.name,
514                                        image_user=os_image_settings.image_user,
515                                        img_format=os_image_settings.format,
516                                        url="http://foo.bar"))
517
518         try:
519             self.image_creator.create()
520         except HTTPBadRequest:
521             pass
522         except URLError:
523             pass
524         except Exception as e:
525             self.fail('Invalid Exception ' + str(e))
526
527     def test_bad_image_image_type(self):
528         """
529         Expect an ImageCreationError when the image type bad
530         """
531         os_image_settings = openstack_tests.cirros_image_settings(
532             name=self.image_name)
533         self.image_creator = create_image.OpenStackImage(
534             self.os_creds,
535             create_image.ImageSettings(name=os_image_settings.name,
536                                        image_user=os_image_settings.image_user,
537                                        img_format='foo',
538                                        url=os_image_settings.url))
539
540         with self.assertRaises(Exception):
541             self.image_creator.create()
542
543     def test_bad_image_file(self):
544         """
545         Expect an ImageCreationError when the image file does not exist
546         """
547         os_image_settings = openstack_tests.cirros_image_settings(
548             name=self.image_name)
549         self.image_creator = create_image.OpenStackImage(
550             self.os_creds,
551             create_image.ImageSettings(name=os_image_settings.name,
552                                        image_user=os_image_settings.image_user,
553                                        img_format=os_image_settings.format,
554                                        image_file="/foo/bar.qcow"))
555         with self.assertRaises(IOError):
556             self.image_creator.create()
557
558
559 class CreateMultiPartImageTests(OSIntegrationTestCase):
560     """
561     Test different means for creating a 3-part images
562     """
563
564     def setUp(self):
565         """
566         Instantiates the CreateImage object that is responsible for
567         downloading and creating an OS image file within OpenStack
568         """
569         super(self.__class__, self).__start__()
570
571         guid = uuid.uuid4()
572         self.image_creators = list()
573         self.image_name = self.__class__.__name__ + '-' + str(guid)
574         self.glance = glance_utils.glance_client(self.os_creds)
575
576         self.tmp_dir = 'tmp/' + str(guid)
577         if not os.path.exists(self.tmp_dir):
578             os.makedirs(self.tmp_dir)
579
580         if self.image_metadata and 'glance_tests' in self.image_metadata:
581             self.glance_test_meta = self.image_metadata['glance_tests']
582         else:
583             self.glance_test_meta = dict()
584
585     def tearDown(self):
586         """
587         Cleans the images and downloaded image file
588         """
589         for image_creator in self.image_creators:
590             image_creator.clean()
591
592         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
593             shutil.rmtree(self.tmp_dir)
594
595         super(self.__class__, self).__clean__()
596
597     def test_create_three_part_image_from_url(self):
598         """
599         Tests the creation of a 3-part OpenStack image from a URL.
600         """
601         # Create the kernel image
602         if 'disk_file' not in self.glance_test_meta:
603             image_settings = openstack_tests.cirros_image_settings(
604                 name=self.image_name,
605                 image_metadata={
606                     'disk_url':
607                         openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
608                     'kernel_url':
609                         openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
610                     'ramdisk_url':
611                         openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
612
613             image_creator = create_image.OpenStackImage(self.os_creds,
614                                                         image_settings)
615             self.image_creators.append(image_creator)
616             image_creator.create()
617
618             main_image = glance_utils.get_image(self.glance,
619                                                 image_settings=image_settings)
620             self.assertIsNotNone(main_image)
621             self.assertIsNotNone(image_creator.get_image())
622             self.assertEqual(image_creator.get_image().id, main_image.id)
623
624             kernel_image = glance_utils.get_image(
625                 self.glance,
626                 image_settings=image_settings.kernel_image_settings)
627             self.assertIsNotNone(kernel_image)
628             self.assertIsNotNone(image_creator.get_kernel_image())
629             self.assertEqual(kernel_image.id,
630                              image_creator.get_kernel_image().id)
631
632             ramdisk_image = glance_utils.get_image(
633                 self.glance,
634                 image_settings=image_settings.ramdisk_image_settings)
635             self.assertIsNotNone(ramdisk_image)
636             self.assertIsNotNone(image_creator.get_ramdisk_image())
637             self.assertEqual(ramdisk_image.id,
638                              image_creator.get_ramdisk_image().id)
639         else:
640             logger.warn(
641                 'Test not executed as the image metadata requires image files')
642
643     def test_create_three_part_image_from_file_3_creators(self):
644         """
645         Tests the creation of a 3-part OpenStack image from files.
646         """
647         file_only = False
648
649         # Set properties
650         properties = {}
651         if self.glance_test_meta:
652             if 'extra_properties' in self.glance_test_meta:
653                 properties = self.glance_test_meta['extra_properties']
654             if 'disk_file' in self.glance_test_meta:
655                 file_only = True
656
657         # Create the kernel image
658         kernel_file_name = None
659         kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
660         if 'kernel_file' in self.glance_test_meta:
661             kernel_file_name = self.glance_test_meta['kernel_file']
662         elif 'kernel_url' in self.glance_test_meta:
663             kernel_url = self.glance_test_meta['kernel_url']
664         else:
665             kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
666
667         if not kernel_file_name and not file_only:
668             kernel_file_name = file_utils.download(kernel_url,
669                                                    self.tmp_dir).name
670         else:
671             logger.warn('Will not download the kernel image.'
672                         ' Cannot execute test')
673             return
674
675         kernel_file_image_settings = openstack_tests.file_image_test_settings(
676             name=self.image_name + '_kernel', file_path=kernel_file_name)
677
678         self.image_creators.append(create_image.OpenStackImage(
679             self.os_creds, kernel_file_image_settings))
680         kernel_image = self.image_creators[-1].create()
681         self.assertIsNotNone(kernel_image)
682         self.assertEqual(get_image_size(kernel_file_image_settings),
683                          kernel_image.size)
684
685         # Create the ramdisk image
686         ramdisk_file_name = None
687         ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
688         if 'ramdisk_file' in self.glance_test_meta:
689             ramdisk_file_name = self.glance_test_meta['ramdisk_file']
690         elif 'ramdisk_url' in self.glance_test_meta:
691             ramdisk_url = self.glance_test_meta['ramdisk_url']
692
693         if not ramdisk_file_name and not file_only:
694             ramdisk_file_name = file_utils.download(ramdisk_url,
695                                                     self.tmp_dir).name
696         else:
697             logger.warn('Will not download the ramdisk image.'
698                         ' Cannot execute test')
699             return
700
701         ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
702             name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
703         self.image_creators.append(create_image.OpenStackImage(
704             self.os_creds, ramdisk_file_image_settings))
705         ramdisk_image = self.image_creators[-1].create()
706         self.assertIsNotNone(ramdisk_image)
707         self.assertEqual(get_image_size(ramdisk_file_image_settings),
708                          ramdisk_image.size)
709
710         # Create the main disk image
711         disk_file_name = None
712         disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
713         if 'disk_file' in self.glance_test_meta:
714             disk_file_name = self.glance_test_meta['disk_file']
715         elif 'disk_url' in self.glance_test_meta:
716             disk_url = self.glance_test_meta['disk_url']
717
718         if not disk_file_name and not file_only:
719             disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
720         else:
721             logger.warn('Will not download the disk file image.'
722                         ' Cannot execute test')
723             return
724
725         file_image_settings = openstack_tests.file_image_test_settings(
726             name=self.image_name, file_path=disk_file_name)
727         properties['kernel_id'] = kernel_image.id
728         properties['ramdisk_id'] = ramdisk_image.id
729         file_image_settings.extra_properties = properties
730         self.image_creators.append(
731             create_image.OpenStackImage(self.os_creds, file_image_settings))
732         created_image = self.image_creators[-1].create()
733
734         self.assertIsNotNone(created_image)
735         self.assertEqual(self.image_name, created_image.name)
736
737         retrieved_image = glance_utils.get_image(
738             self.glance, image_settings=file_image_settings)
739         self.assertIsNotNone(retrieved_image)
740         self.assertEqual(self.image_creators[-1].get_image().size,
741                          retrieved_image.size)
742         self.assertEqual(get_image_size(file_image_settings),
743                          retrieved_image.size)
744         self.assertEqual(created_image.name, retrieved_image.name)
745         self.assertEqual(created_image.id, retrieved_image.id)
746         self.assertEqual(created_image.properties, retrieved_image.properties)
747
748     def test_create_three_part_image_from_url_3_creators(self):
749         """
750         Tests the creation of a 3-part OpenStack image from a URL.
751         """
752         if 'disk_file' not in self.glance_test_meta:
753             # Set properties
754             properties = {}
755             if self.glance_test_meta and \
756                     'extra_properties' in self.glance_test_meta:
757                 properties = self.glance_test_meta['extra_properties']
758
759             # Create the kernel image
760             kernel_image_settings = openstack_tests.cirros_image_settings(
761                 name=self.image_name + '_kernel',
762                 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
763
764             if self.glance_test_meta:
765                 if 'kernel_url' in self.glance_test_meta:
766                     kernel_image_settings.url = self.glance_test_meta[
767                         'kernel_url']
768             self.image_creators.append(
769                 create_image.OpenStackImage(self.os_creds,
770                                             kernel_image_settings))
771             kernel_image = self.image_creators[-1].create()
772             self.assertIsNotNone(kernel_image)
773             self.assertEqual(get_image_size(kernel_image_settings),
774                              kernel_image.size)
775
776             # Create the ramdisk image
777             ramdisk_image_settings = openstack_tests.cirros_image_settings(
778                 name=self.image_name + '_ramdisk',
779                 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
780             if self.glance_test_meta:
781                 if 'ramdisk_url' in self.glance_test_meta:
782                     ramdisk_image_settings.url = self.glance_test_meta[
783                         'ramdisk_url']
784             self.image_creators.append(
785                 create_image.OpenStackImage(self.os_creds,
786                                             ramdisk_image_settings))
787             ramdisk_image = self.image_creators[-1].create()
788             self.assertIsNotNone(ramdisk_image)
789             self.assertEqual(get_image_size(ramdisk_image_settings),
790                              ramdisk_image.size)
791
792             # Create the main image
793             os_image_settings = openstack_tests.cirros_image_settings(
794                 name=self.image_name,
795                 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
796             if self.glance_test_meta:
797                 if 'disk_url' in self.glance_test_meta:
798                     os_image_settings.url = self.glance_test_meta['disk_url']
799
800             properties['kernel_id'] = kernel_image.id
801             properties['ramdisk_id'] = ramdisk_image.id
802             os_image_settings.extra_properties = properties
803
804             self.image_creators.append(
805                 create_image.OpenStackImage(self.os_creds, os_image_settings))
806             created_image = self.image_creators[-1].create()
807             self.assertIsNotNone(created_image)
808             self.assertEqual(self.image_name, created_image.name)
809
810             retrieved_image = glance_utils.get_image(
811                 self.glance, image_settings=os_image_settings)
812             self.assertIsNotNone(retrieved_image)
813
814             self.assertEqual(self.image_creators[-1].get_image().size,
815                              retrieved_image.size)
816             self.assertEqual(get_image_size(os_image_settings),
817                              retrieved_image.size)
818
819             self.assertEqual(created_image.name, retrieved_image.name)
820             self.assertEqual(created_image.id, retrieved_image.id)
821             self.assertEqual(created_image.properties,
822                              retrieved_image.properties)
823         else:
824             logger.warn(
825                 'Test not executed as the image metadata requires image files')
826
827
828 def get_image_size(image_settings):
829     """
830     Returns the expected image size
831     :return:
832     """
833     if image_settings.image_file:
834         return os.path.getsize(image_settings.image_file)
835     elif image_settings.url:
836         return int(file_utils.get_content_length(image_settings.url))
837     else:
838         raise Exception(
839             'Cannot retrieve expected image size. Image filename or URL has '
840             'not been configured')