Added support for Glance v2
[snaps.git] / snaps / openstack / tests / create_image_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import os
16 import shutil
17 import uuid
18 import unittest
19
20 from snaps import file_utils
21 from snaps.openstack.create_image import ImageSettings
22
23 import openstack_tests
24 from snaps.openstack.utils import glance_utils
25 from snaps.openstack import create_image
26 from snaps.openstack import os_credentials
27 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
28
29 __author__ = 'spisarski'
30
31
32 class ImageSettingsUnitTests(unittest.TestCase):
33     """
34     Tests the construction of the ImageSettings class
35     """
36
37     def test_no_params(self):
38         with self.assertRaises(Exception):
39             ImageSettings()
40
41     def test_empty_config(self):
42         with self.assertRaises(Exception):
43             ImageSettings(config=dict())
44
45     def test_name_only(self):
46         with self.assertRaises(Exception):
47             ImageSettings(name='foo')
48
49     def test_config_with_name_only(self):
50         with self.assertRaises(Exception):
51             ImageSettings(config={'name': 'foo'})
52
53     def test_name_user_only(self):
54         with self.assertRaises(Exception):
55             ImageSettings(name='foo', image_user='bar')
56
57     def test_config_with_name_user_only(self):
58         with self.assertRaises(Exception):
59             ImageSettings(config={'name': 'foo', 'image_user': 'bar'})
60
61     def test_name_user_format_only(self):
62         with self.assertRaises(Exception):
63             ImageSettings(name='foo', image_user='bar', img_format='qcow2')
64
65     def test_config_with_name_user_format_only(self):
66         with self.assertRaises(Exception):
67             ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
68
69     def test_name_user_format_url_file_only(self):
70         with self.assertRaises(Exception):
71             ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
72                           image_file='/foo/bar.qcow')
73
74     def test_config_with_name_user_format_url_file_only(self):
75         with self.assertRaises(Exception):
76             ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
77                                   'download_url': 'http://foo.com', 'image_file': '/foo/bar.qcow'})
78
79     def test_name_user_format_url_only(self):
80         settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
81         self.assertEquals('foo', settings.name)
82         self.assertEquals('bar', settings.image_user)
83         self.assertEquals('qcow2', settings.format)
84         self.assertEquals('http://foo.com', settings.url)
85         self.assertIsNone(settings.image_file)
86         self.assertIsNone(settings.nic_config_pb_loc)
87
88     def test_name_user_format_url_only_properties(self):
89         properties = {'hw_video_model': 'vga'}
90         settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
91                                  extra_properties=properties)
92         self.assertEquals('foo', settings.name)
93         self.assertEquals('bar', settings.image_user)
94         self.assertEquals('qcow2', settings.format)
95         self.assertEquals('http://foo.com', settings.url)
96         self.assertEquals(properties, settings.extra_properties)
97         self.assertIsNone(settings.image_file)
98         self.assertIsNone(settings.nic_config_pb_loc)
99
100     def test_config_with_name_user_format_url_only(self):
101         settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
102                                          'download_url': 'http://foo.com'})
103         self.assertEquals('foo', settings.name)
104         self.assertEquals('bar', settings.image_user)
105         self.assertEquals('qcow2', settings.format)
106         self.assertEquals('http://foo.com', settings.url)
107         self.assertIsNone(settings.image_file)
108         self.assertIsNone(settings.nic_config_pb_loc)
109
110     def test_name_user_format_file_only(self):
111         settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
112         self.assertEquals('foo', settings.name)
113         self.assertEquals('bar', settings.image_user)
114         self.assertEquals('qcow2', settings.format)
115         self.assertIsNone(settings.url)
116         self.assertEquals('/foo/bar.qcow', settings.image_file)
117         self.assertIsNone(settings.nic_config_pb_loc)
118
119     def test_config_with_name_user_format_file_only(self):
120         settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
121                                          'image_file': '/foo/bar.qcow'})
122         self.assertEquals('foo', settings.name)
123         self.assertEquals('bar', settings.image_user)
124         self.assertEquals('qcow2', settings.format)
125         self.assertIsNone(settings.url)
126         self.assertEquals('/foo/bar.qcow', settings.image_file)
127         self.assertIsNone(settings.nic_config_pb_loc)
128
129     def test_all_url(self):
130         properties = {'hw_video_model': 'vga'}
131         settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
132                                  extra_properties=properties, nic_config_pb_loc='/foo/bar')
133         self.assertEquals('foo', settings.name)
134         self.assertEquals('bar', settings.image_user)
135         self.assertEquals('qcow2', settings.format)
136         self.assertEquals('http://foo.com', settings.url)
137         self.assertEquals(properties, settings.extra_properties)
138         self.assertIsNone(settings.image_file)
139         self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
140
141     def test_config_all_url(self):
142         settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
143                                          'download_url': 'http://foo.com',
144                                          'extra_properties': '{\'hw_video_model\': \'vga\'}',
145                                          'nic_config_pb_loc': '/foo/bar'})
146         self.assertEquals('foo', settings.name)
147         self.assertEquals('bar', settings.image_user)
148         self.assertEquals('qcow2', settings.format)
149         self.assertEquals('http://foo.com', settings.url)
150         self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
151         self.assertIsNone(settings.image_file)
152         self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
153
154     def test_all_file(self):
155         properties = {'hw_video_model': 'vga'}
156         settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
157                                  extra_properties=properties, nic_config_pb_loc='/foo/bar')
158         self.assertEquals('foo', settings.name)
159         self.assertEquals('bar', settings.image_user)
160         self.assertEquals('qcow2', settings.format)
161         self.assertIsNone(settings.url)
162         self.assertEquals('/foo/bar.qcow', settings.image_file)
163         self.assertEquals(properties, settings.extra_properties)
164         self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
165
166     def test_config_all_file(self):
167         settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
168                                          'image_file': '/foo/bar.qcow',
169                                          'extra_properties': '{\'hw_video_model\' : \'vga\'}',
170                                          'nic_config_pb_loc': '/foo/bar'})
171         self.assertEquals('foo', settings.name)
172         self.assertEquals('bar', settings.image_user)
173         self.assertEquals('qcow2', settings.format)
174         self.assertIsNone(settings.url)
175         self.assertEquals('/foo/bar.qcow', settings.image_file)
176         self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
177         self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
178
179
180 class CreateImageSuccessTests(OSIntegrationTestCase):
181     """
182     Test for the CreateImage class defined in create_image.py
183     """
184
185     def setUp(self):
186         """
187         Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
188         within OpenStack
189         """
190         super(self.__class__, self).__start__()
191
192         guid = uuid.uuid4()
193         self.image_name = self.__class__.__name__ + '-' + str(guid)
194         self.glance = glance_utils.glance_client(self.os_creds)
195         self.image_creators = list()
196
197         self.tmp_dir = 'tmp/' + str(guid)
198         if not os.path.exists(self.tmp_dir):
199             os.makedirs(self.tmp_dir)
200
201     def tearDown(self):
202         """
203         Cleans the image and downloaded image file
204         """
205         if self.image_creators:
206             while self.image_creators:
207                 self.image_creators[-1].clean()
208                 self.image_creators.pop()
209
210         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
211             shutil.rmtree(self.tmp_dir)
212
213         super(self.__class__, self).__clean__()
214
215     def test_create_image_clean_url(self):
216         """
217         Tests the creation of an OpenStack image from a URL.
218         """
219         # Create Image
220         # Set the default image settings, then set any custom parameters sent from the app
221         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
222         if self.image_metadata:
223             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
224                 os_image_settings.url = self.image_metadata['disk_url']
225             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
226                 os_image_settings.extra_properties = self.image_metadata['extra_properties']
227
228         # If this is a 3-part image create the kernel and ramdisk images first
229         if self.image_metadata:
230             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
231                 kernel_image_settings = openstack_tests.cirros_url_image(
232                     name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
233                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
234                 kernel_image = self.image_creators[-1].create()
235                 self.assertIsNotNone(kernel_image)
236                 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
237                 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
238
239             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
240                 ramdisk_image_settings = openstack_tests.cirros_url_image(
241                     name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
242                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
243                 ramdisk_image = self.image_creators[-1].create()
244                 self.assertIsNotNone(ramdisk_image)
245                 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
246                 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
247
248         self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
249         created_image = self.image_creators[-1].create()
250         self.assertIsNotNone(created_image)
251
252         retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
253         self.assertIsNotNone(retrieved_image)
254         self.assertEquals(created_image.size, retrieved_image.size)
255         self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
256         self.assertEquals(created_image.name, retrieved_image.name)
257         self.assertEquals(created_image.id, retrieved_image.id)
258
259     def test_create_image_clean_url_properties(self):
260         """
261         Tests the creation of an OpenStack image from a URL and set properties.
262         """
263         # Create Image
264         # Set the default image settings, then set any custom parameters sent from the app
265         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
266         # Set properties
267         os_image_settings.extra_properties = {'hw_video_model': 'vga'}
268
269         if self.image_metadata:
270             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
271                 os_image_settings.url = self.image_metadata['disk_url']
272             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
273                 os_image_settings.extra_properties = dict(
274                     os_image_settings.extra_properties.items()
275                     + self.image_metadata['extra_properties'].items())
276
277         # If this is a 3-part image create the kernel and ramdisk images first
278         if self.image_metadata:
279             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
280                 kernel_image_settings = openstack_tests.cirros_url_image(
281                     name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
282                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
283                 kernel_image = self.image_creators[-1].create()
284                 self.assertIsNotNone(kernel_image)
285                 os_image_settings.extra_properties[str('kernel_id')] = kernel_image.id
286                 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
287
288             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
289                 ramdisk_image_settings = openstack_tests.cirros_url_image(
290                     name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
291                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
292                 ramdisk_image = self.image_creators[-1].create()
293                 self.assertIsNotNone(ramdisk_image)
294                 os_image_settings.extra_properties[str('ramdisk_id')] = ramdisk_image.id
295                 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
296
297         self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
298         created_image = self.image_creators[-1].create()
299         self.assertIsNotNone(created_image)
300
301         retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
302         self.assertIsNotNone(retrieved_image)
303         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
304         self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
305         self.assertEquals(created_image.name, retrieved_image.name)
306         self.assertEquals(created_image.id, retrieved_image.id)
307         # self.assertEquals(created_image.properties, retrieved_image.properties)
308
309     def test_create_image_clean_file(self):
310         """
311         Tests the creation of an OpenStack image from a file.
312         """
313
314         # Create Image
315         # Set the default image settings, then set any custom parameters sent from the app
316         url_image_settings = openstack_tests.cirros_url_image('foo')
317         if self.image_metadata:
318             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
319                 url_image_settings.url = self.image_metadata['disk_url']
320
321         # Download the file of the image
322         image_file = file_utils.download(url_image_settings.url, self.tmp_dir)
323         file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
324
325         # Set extra properties sent from the app (if any)
326         if self.image_metadata:
327             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
328                 file_image_settings.extra_properties = self.image_metadata['extra_properties']
329
330         # If this is a 3-part image create the kernel and ramdisk images first
331         if self.image_metadata:
332             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
333                 kernel_image_file = file_utils.download(self.image_metadata['kernel_url'], self.tmp_dir)
334                 kernel_image_settings = openstack_tests.file_image_test_settings(
335                     name=self.image_name+'_kernel', file_path=kernel_image_file.name)
336                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
337                 kernel_image = self.image_creators[-1].create()
338                 self.assertIsNotNone(kernel_image)
339                 file_image_settings.extra_properties['kernel_id'] = kernel_image.id
340                 self.assertIsNotNone(kernel_image)
341                 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
342
343             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
344                 ramdisk_image_file = file_utils.download(self.image_metadata['ramdisk_url'], self.tmp_dir)
345                 ramdisk_image_settings = openstack_tests.file_image_test_settings(
346                     name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
347                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
348                 ramdisk_image = self.image_creators[-1].create()
349                 self.assertIsNotNone(ramdisk_image)
350                 file_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
351
352         self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
353         created_image = self.image_creators[-1].create()
354         self.assertIsNotNone(created_image)
355         self.assertEqual(self.image_name, created_image.name)
356
357         retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
358         self.assertIsNotNone(retrieved_image)
359         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
360         self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
361
362         self.assertEquals(created_image.name, retrieved_image.name)
363         self.assertEquals(created_image.id, retrieved_image.id)
364
365     def test_create_delete_image(self):
366         """
367         Tests the creation then deletion of an OpenStack image to ensure clean() does not raise an Exception.
368         """
369         # Create Image
370         # Set the default image settings, then set any custom parameters sent from the app
371         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
372         if self.image_metadata:
373             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
374                 os_image_settings.url = self.image_metadata['disk_url']
375             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
376                 os_image_settings.extra_properties = self.image_metadata['extra_properties']
377
378         # If this is a 3-part image create the kernel and ramdisk images first
379         if self.image_metadata:
380             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
381                 kernel_image_settings = openstack_tests.cirros_url_image(
382                     name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
383                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
384                 kernel_image = self.image_creators[-1].create()
385                 self.assertIsNotNone(kernel_image)
386                 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
387                 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
388
389             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
390                 ramdisk_image_settings = openstack_tests.cirros_url_image(
391                     name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
392                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
393                 ramdisk_image = self.image_creators[-1].create()
394                 self.assertIsNotNone(ramdisk_image)
395                 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
396                 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
397
398         self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
399         created_image = self.image_creators[-1].create()
400         self.assertIsNotNone(created_image)
401
402         retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
403         self.assertIsNotNone(retrieved_image)
404         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
405         self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
406
407         # Delete Image manually
408         glance_utils.delete_image(self.glance, created_image)
409
410         self.assertIsNone(glance_utils.get_image(self.glance, self.image_creators[-1].image_settings.name))
411
412         # Must not throw an exception when attempting to cleanup non-existent image
413         self.image_creators[-1].clean()
414         self.assertIsNone(self.image_creators[-1].get_image())
415         self.image_creators.pop()
416
417     def test_create_same_image(self):
418         """
419         Tests the creation of an OpenStack image when the image already exists.
420         """
421         # Create Image
422         # Set the default image settings, then set any custom parameters sent from the app
423         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
424         if self.image_metadata:
425             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
426                 os_image_settings.url = self.image_metadata['disk_url']
427             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
428                 os_image_settings.extra_properties = self.image_metadata['extra_properties']
429
430         # If this is a 3-part image create the kernel and ramdisk images first
431         if self.image_metadata:
432             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
433                 kernel_image_settings = openstack_tests.cirros_url_image(
434                     name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
435                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
436                 kernel_image = self.image_creators[-1].create()
437                 self.assertIsNotNone(kernel_image)
438                 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
439                 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
440
441             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
442                 ramdisk_image_settings = openstack_tests.cirros_url_image(
443                     name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
444                 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
445                 ramdisk_image = self.image_creators[-1].create()
446                 self.assertIsNotNone(ramdisk_image)
447                 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
448
449         self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
450         image1 = self.image_creators[-1].create()
451
452         retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
453         self.assertIsNotNone(retrieved_image)
454         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
455         self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
456         self.assertEquals(image1.name, retrieved_image.name)
457         self.assertEquals(image1.id, retrieved_image.id)
458         self.assertEquals(image1.properties, retrieved_image.properties)
459
460         # Should be retrieving the instance data
461         os_image_2 = create_image.OpenStackImage(self.os_creds, os_image_settings)
462         image2 = os_image_2.create()
463         self.assertEquals(image1.id, image2.id)
464
465
466 class CreateImageNegativeTests(OSIntegrationTestCase):
467     """
468     Negative test cases for the CreateImage class
469     """
470
471     def setUp(self):
472         super(self.__class__, self).__start__()
473
474         self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
475         self.image_creator = None
476
477     def tearDown(self):
478         if self.image_creator:
479             self.image_creator.clean()
480
481         super(self.__class__, self).__clean__()
482
483     def test_none_image_name(self):
484         """
485         Expect an exception when the image name is None
486         """
487         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
488         with self.assertRaises(Exception):
489             self.image_creator = create_image.OpenStackImage(
490                 self.os_creds, create_image.ImageSettings(
491                     name=None, image_user=os_image_settings.image_user, img_format=os_image_settings.format,
492                     url=os_image_settings.url))
493
494             self.fail('Exception should have been thrown prior to this line')
495
496     def test_bad_image_url(self):
497         """
498         Expect an exception when the image download url is bad
499         """
500         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
501         self.image_creator = create_image.OpenStackImage(self.os_creds, create_image.ImageSettings(
502             name=os_image_settings.name, image_user=os_image_settings.image_user,
503             img_format=os_image_settings.format, url="http://foo.bar"))
504         with self.assertRaises(Exception):
505             self.image_creator.create()
506
507     def test_bad_image_file(self):
508         """
509         Expect an exception when the image file does not exist
510         """
511         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
512         self.image_creator = create_image.OpenStackImage(
513             self.os_creds,
514             create_image.ImageSettings(name=os_image_settings.name, image_user=os_image_settings.image_user,
515                                        img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
516         with self.assertRaises(Exception):
517             self.image_creator.create()
518
519     def test_none_proj_name(self):
520         """
521         Expect an exception when the project name is None
522         """
523         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
524         with self.assertRaises(Exception):
525             self.image_creator = create_image.OpenStackImage(
526                 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, self.os_creds.auth_url, None,
527                                        proxy_settings=self.os_creds.proxy_settings),
528                 os_image_settings)
529             self.image_creator.create()
530
531     def test_none_auth_url(self):
532         """
533         Expect an exception when the project name is None
534         """
535         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
536         with self.assertRaises(Exception):
537             self.image_creator = create_image.OpenStackImage(
538                 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, None,
539                                        self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
540                 os_image_settings)
541             self.image_creator.create()
542
543     def test_none_password(self):
544         """
545         Expect an exception when the project name is None
546         """
547         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
548         with self.assertRaises(Exception):
549             self.image_creator = create_image.OpenStackImage(
550                 os_credentials.OSCreds(self.os_creds.username, None, self.os_creds.os_auth_url,
551                                        self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
552                 os_image_settings)
553
554     def test_none_user(self):
555         """
556         Expect an exception when the project name is None
557         """
558         os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
559         with self.assertRaises(Exception):
560             self.image_creator = create_image.OpenStackImage(
561                 os_credentials.OSCreds(None, self.os_creds.password, self.os_creds.os_auth_url,
562                                        self.os_creds.project_name,
563                                        proxy_settings=self.os_creds.proxy_settings),
564                 os_image_settings)
565
566
567 class CreateMultiPartImageTests(OSIntegrationTestCase):
568     """
569     Test for creating a 3-part image
570     """
571     def setUp(self):
572         """
573         Instantiates the CreateImage object that is responsible for
574         downloading and creating an OS image file within OpenStack
575         """
576         super(self.__class__, self).__start__()
577
578         guid = uuid.uuid4()
579         self.image_creators = list()
580         self.image_name = self.__class__.__name__ + '-' + str(guid)
581         self.glance = glance_utils.glance_client(self.os_creds)
582
583         self.tmp_dir = 'tmp/' + str(guid)
584         if not os.path.exists(self.tmp_dir):
585             os.makedirs(self.tmp_dir)
586
587     def tearDown(self):
588         """
589         Cleans the images and downloaded image file
590         """
591         while self.image_creators:
592             self.image_creators[-1].clean()
593             self.image_creators.pop()
594
595         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
596             shutil.rmtree(self.tmp_dir)
597
598         super(self.__class__, self).__clean__()
599
600     def test_create_three_part_image_from_url(self):
601         """
602         Tests the creation of a 3-part OpenStack image from a URL.
603         """
604         # Set properties
605         properties = {}
606         if self.image_metadata:
607             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
608                 properties = self.image_metadata['extra_properties']
609
610         # Create the kernel image
611         kernel_image_settings = openstack_tests.cirros_url_image(
612             name=self.image_name+'_kernel',
613             url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel')
614
615         if self.image_metadata:
616             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
617                 kernel_image_settings.url = self.image_metadata['kernel_url']
618         self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
619         kernel_image = self.image_creators[-1].create()
620         self.assertIsNotNone(kernel_image)
621         self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
622
623         # Create the ramdisk image
624         ramdisk_image_settings = openstack_tests.cirros_url_image(
625             name=self.image_name+'_ramdisk',
626             url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs')
627         if self.image_metadata:
628             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
629                 ramdisk_image_settings.url = self.image_metadata['ramdisk_url']
630         self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
631         ramdisk_image = self.image_creators[-1].create()
632         self.assertIsNotNone(ramdisk_image)
633         self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
634
635         # Create the main image
636         os_image_settings = openstack_tests.cirros_url_image(
637             name=self.image_name,
638             url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img')
639         if self.image_metadata:
640             if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
641                 os_image_settings.url = self.image_metadata['disk_url']
642
643         properties['kernel_id'] = kernel_image.id
644         properties['ramdisk_id'] = ramdisk_image.id
645         os_image_settings.extra_properties = properties
646
647         self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
648         created_image = self.image_creators[-1].create()
649         self.assertIsNotNone(created_image)
650         self.assertEqual(self.image_name, created_image.name)
651
652         retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
653         self.assertIsNotNone(retrieved_image)
654         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
655         self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
656         self.assertEquals(created_image.name, retrieved_image.name)
657         self.assertEquals(created_image.id, retrieved_image.id)
658         self.assertEquals(created_image.properties, retrieved_image.properties)
659
660     def test_create_three_part_image_from_file(self):
661         """
662         Tests the creation of a 3-part OpenStack image from files.
663         """
664         # Set properties
665         properties = {}
666         if self.image_metadata:
667             if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
668                 properties = self.image_metadata['extra_properties']
669         # Create the kernel image
670         kernel_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel'
671         if self.image_metadata:
672             if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
673                 kernel_url = self.image_metadata['kernel_url']
674         kernel_image_file = file_utils.download(kernel_url, self.tmp_dir)
675         kernel_file_image_settings = openstack_tests.file_image_test_settings(
676             name=self.image_name+'_kernel', file_path=kernel_image_file.name)
677         self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
678         kernel_image = self.image_creators[-1].create()
679         self.assertIsNotNone(kernel_image)
680         self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
681
682         # Create the ramdisk image
683         ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
684         if self.image_metadata:
685             if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
686                 ramdisk_url = self.image_metadata['ramdisk_url']
687         ramdisk_image_file = file_utils.download(ramdisk_url, self.tmp_dir)
688         ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
689             name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
690         self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
691         ramdisk_image = self.image_creators[-1].create()
692         self.assertIsNotNone(ramdisk_image)
693         self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
694
695         # Create the main image
696         image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
697         image_file = file_utils.download(image_url, self.tmp_dir)
698         file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
699         properties['kernel_id'] = kernel_image.id
700         properties['ramdisk_id'] = ramdisk_image.id
701         file_image_settings.extra_properties = properties
702         self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
703         created_image = self.image_creators[-1].create()
704
705         self.assertIsNotNone(created_image)
706         self.assertEqual(self.image_name, created_image.name)
707
708         retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
709         self.assertIsNotNone(retrieved_image)
710
711         self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
712         self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
713
714         self.assertEquals(created_image.name, retrieved_image.name)
715         self.assertEquals(created_image.id, retrieved_image.id)
716         self.assertEquals(created_image.properties, retrieved_image.properties)
717
718
719 def get_image_size(image_settings):
720     """
721     Returns the expected image size
722     :return:
723     """
724     if image_settings.image_file:
725         return os.path.getsize(image_settings.image_file)
726     elif image_settings.url:
727         return int(file_utils.get_content_length(image_settings.url))
728     else:
729         raise Exception('Cannot retrieve expected image size. Image filename or URL has not been configured')