1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from snaps import file_utils
21 from snaps.openstack.create_image import ImageSettings
23 import openstack_tests
24 from snaps.openstack.utils import glance_utils
25 from snaps.openstack import create_image
26 from snaps.openstack import os_credentials
27 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
29 __author__ = 'spisarski'
32 class ImageSettingsUnitTests(unittest.TestCase):
34 Tests the construction of the ImageSettings class
37 def test_no_params(self):
38 with self.assertRaises(Exception):
41 def test_empty_config(self):
42 with self.assertRaises(Exception):
43 ImageSettings(config=dict())
45 def test_name_only(self):
46 with self.assertRaises(Exception):
47 ImageSettings(name='foo')
49 def test_config_with_name_only(self):
50 with self.assertRaises(Exception):
51 ImageSettings(config={'name': 'foo'})
53 def test_name_user_only(self):
54 with self.assertRaises(Exception):
55 ImageSettings(name='foo', image_user='bar')
57 def test_config_with_name_user_only(self):
58 with self.assertRaises(Exception):
59 ImageSettings(config={'name': 'foo', 'image_user': 'bar'})
61 def test_name_user_format_only(self):
62 with self.assertRaises(Exception):
63 ImageSettings(name='foo', image_user='bar', img_format='qcow2')
65 def test_config_with_name_user_format_only(self):
66 with self.assertRaises(Exception):
67 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
69 def test_name_user_format_url_file_only(self):
70 with self.assertRaises(Exception):
71 ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
72 image_file='/foo/bar.qcow')
74 def test_config_with_name_user_format_url_file_only(self):
75 with self.assertRaises(Exception):
76 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
77 'download_url': 'http://foo.com', 'image_file': '/foo/bar.qcow'})
79 def test_name_user_format_url_only(self):
80 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
81 self.assertEquals('foo', settings.name)
82 self.assertEquals('bar', settings.image_user)
83 self.assertEquals('qcow2', settings.format)
84 self.assertEquals('http://foo.com', settings.url)
85 self.assertIsNone(settings.image_file)
86 self.assertIsNone(settings.nic_config_pb_loc)
88 def test_name_user_format_url_only_properties(self):
89 properties = {'hw_video_model': 'vga'}
90 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
91 extra_properties=properties)
92 self.assertEquals('foo', settings.name)
93 self.assertEquals('bar', settings.image_user)
94 self.assertEquals('qcow2', settings.format)
95 self.assertEquals('http://foo.com', settings.url)
96 self.assertEquals(properties, settings.extra_properties)
97 self.assertIsNone(settings.image_file)
98 self.assertIsNone(settings.nic_config_pb_loc)
100 def test_config_with_name_user_format_url_only(self):
101 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
102 'download_url': 'http://foo.com'})
103 self.assertEquals('foo', settings.name)
104 self.assertEquals('bar', settings.image_user)
105 self.assertEquals('qcow2', settings.format)
106 self.assertEquals('http://foo.com', settings.url)
107 self.assertIsNone(settings.image_file)
108 self.assertIsNone(settings.nic_config_pb_loc)
110 def test_name_user_format_file_only(self):
111 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
112 self.assertEquals('foo', settings.name)
113 self.assertEquals('bar', settings.image_user)
114 self.assertEquals('qcow2', settings.format)
115 self.assertIsNone(settings.url)
116 self.assertEquals('/foo/bar.qcow', settings.image_file)
117 self.assertIsNone(settings.nic_config_pb_loc)
119 def test_config_with_name_user_format_file_only(self):
120 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
121 'image_file': '/foo/bar.qcow'})
122 self.assertEquals('foo', settings.name)
123 self.assertEquals('bar', settings.image_user)
124 self.assertEquals('qcow2', settings.format)
125 self.assertIsNone(settings.url)
126 self.assertEquals('/foo/bar.qcow', settings.image_file)
127 self.assertIsNone(settings.nic_config_pb_loc)
129 def test_all_url(self):
130 properties = {'hw_video_model': 'vga'}
131 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
132 extra_properties=properties, nic_config_pb_loc='/foo/bar')
133 self.assertEquals('foo', settings.name)
134 self.assertEquals('bar', settings.image_user)
135 self.assertEquals('qcow2', settings.format)
136 self.assertEquals('http://foo.com', settings.url)
137 self.assertEquals(properties, settings.extra_properties)
138 self.assertIsNone(settings.image_file)
139 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
141 def test_config_all_url(self):
142 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
143 'download_url': 'http://foo.com',
144 'extra_properties': '{\'hw_video_model\': \'vga\'}',
145 'nic_config_pb_loc': '/foo/bar'})
146 self.assertEquals('foo', settings.name)
147 self.assertEquals('bar', settings.image_user)
148 self.assertEquals('qcow2', settings.format)
149 self.assertEquals('http://foo.com', settings.url)
150 self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
151 self.assertIsNone(settings.image_file)
152 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
154 def test_all_file(self):
155 properties = {'hw_video_model': 'vga'}
156 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
157 extra_properties=properties, nic_config_pb_loc='/foo/bar')
158 self.assertEquals('foo', settings.name)
159 self.assertEquals('bar', settings.image_user)
160 self.assertEquals('qcow2', settings.format)
161 self.assertIsNone(settings.url)
162 self.assertEquals('/foo/bar.qcow', settings.image_file)
163 self.assertEquals(properties, settings.extra_properties)
164 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
166 def test_config_all_file(self):
167 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
168 'image_file': '/foo/bar.qcow',
169 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
170 'nic_config_pb_loc': '/foo/bar'})
171 self.assertEquals('foo', settings.name)
172 self.assertEquals('bar', settings.image_user)
173 self.assertEquals('qcow2', settings.format)
174 self.assertIsNone(settings.url)
175 self.assertEquals('/foo/bar.qcow', settings.image_file)
176 self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
177 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
180 class CreateImageSuccessTests(OSIntegrationTestCase):
182 Test for the CreateImage class defined in create_image.py
187 Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
190 super(self.__class__, self).__start__()
193 self.image_name = self.__class__.__name__ + '-' + str(guid)
194 self.glance = glance_utils.glance_client(self.os_creds)
195 self.image_creators = list()
197 self.tmp_dir = 'tmp/' + str(guid)
198 if not os.path.exists(self.tmp_dir):
199 os.makedirs(self.tmp_dir)
203 Cleans the image and downloaded image file
205 if self.image_creators:
206 while self.image_creators:
207 self.image_creators[-1].clean()
208 self.image_creators.pop()
210 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
211 shutil.rmtree(self.tmp_dir)
213 super(self.__class__, self).__clean__()
215 def test_create_image_clean_url(self):
217 Tests the creation of an OpenStack image from a URL.
220 # Set the default image settings, then set any custom parameters sent from the app
221 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
222 if self.image_metadata:
223 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
224 os_image_settings.url = self.image_metadata['disk_url']
225 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
226 os_image_settings.extra_properties = self.image_metadata['extra_properties']
228 # If this is a 3-part image create the kernel and ramdisk images first
229 if self.image_metadata:
230 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
231 kernel_image_settings = openstack_tests.cirros_url_image(
232 name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
233 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
234 kernel_image = self.image_creators[-1].create()
235 self.assertIsNotNone(kernel_image)
236 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
237 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
239 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
240 ramdisk_image_settings = openstack_tests.cirros_url_image(
241 name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
242 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
243 ramdisk_image = self.image_creators[-1].create()
244 self.assertIsNotNone(ramdisk_image)
245 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
246 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
248 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
249 created_image = self.image_creators[-1].create()
250 self.assertIsNotNone(created_image)
252 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
253 self.assertIsNotNone(retrieved_image)
254 self.assertEquals(created_image.size, retrieved_image.size)
255 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
256 self.assertEquals(created_image.name, retrieved_image.name)
257 self.assertEquals(created_image.id, retrieved_image.id)
259 def test_create_image_clean_url_properties(self):
261 Tests the creation of an OpenStack image from a URL and set properties.
264 # Set the default image settings, then set any custom parameters sent from the app
265 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
267 os_image_settings.extra_properties = {'hw_video_model': 'vga'}
269 if self.image_metadata:
270 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
271 os_image_settings.url = self.image_metadata['disk_url']
272 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
273 os_image_settings.extra_properties = dict(
274 os_image_settings.extra_properties.items()
275 + self.image_metadata['extra_properties'].items())
277 # If this is a 3-part image create the kernel and ramdisk images first
278 if self.image_metadata:
279 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
280 kernel_image_settings = openstack_tests.cirros_url_image(
281 name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
282 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
283 kernel_image = self.image_creators[-1].create()
284 self.assertIsNotNone(kernel_image)
285 os_image_settings.extra_properties[str('kernel_id')] = kernel_image.id
286 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
288 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
289 ramdisk_image_settings = openstack_tests.cirros_url_image(
290 name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
291 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
292 ramdisk_image = self.image_creators[-1].create()
293 self.assertIsNotNone(ramdisk_image)
294 os_image_settings.extra_properties[str('ramdisk_id')] = ramdisk_image.id
295 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
297 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
298 created_image = self.image_creators[-1].create()
299 self.assertIsNotNone(created_image)
301 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
302 self.assertIsNotNone(retrieved_image)
303 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
304 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
305 self.assertEquals(created_image.name, retrieved_image.name)
306 self.assertEquals(created_image.id, retrieved_image.id)
307 # self.assertEquals(created_image.properties, retrieved_image.properties)
309 def test_create_image_clean_file(self):
311 Tests the creation of an OpenStack image from a file.
315 # Set the default image settings, then set any custom parameters sent from the app
316 url_image_settings = openstack_tests.cirros_url_image('foo')
317 if self.image_metadata:
318 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
319 url_image_settings.url = self.image_metadata['disk_url']
321 # Download the file of the image
322 image_file = file_utils.download(url_image_settings.url, self.tmp_dir)
323 file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
325 # Set extra properties sent from the app (if any)
326 if self.image_metadata:
327 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
328 file_image_settings.extra_properties = self.image_metadata['extra_properties']
330 # If this is a 3-part image create the kernel and ramdisk images first
331 if self.image_metadata:
332 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
333 kernel_image_file = file_utils.download(self.image_metadata['kernel_url'], self.tmp_dir)
334 kernel_image_settings = openstack_tests.file_image_test_settings(
335 name=self.image_name+'_kernel', file_path=kernel_image_file.name)
336 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
337 kernel_image = self.image_creators[-1].create()
338 self.assertIsNotNone(kernel_image)
339 file_image_settings.extra_properties['kernel_id'] = kernel_image.id
340 self.assertIsNotNone(kernel_image)
341 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
343 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
344 ramdisk_image_file = file_utils.download(self.image_metadata['ramdisk_url'], self.tmp_dir)
345 ramdisk_image_settings = openstack_tests.file_image_test_settings(
346 name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
347 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
348 ramdisk_image = self.image_creators[-1].create()
349 self.assertIsNotNone(ramdisk_image)
350 file_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
352 self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
353 created_image = self.image_creators[-1].create()
354 self.assertIsNotNone(created_image)
355 self.assertEqual(self.image_name, created_image.name)
357 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
358 self.assertIsNotNone(retrieved_image)
359 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
360 self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
362 self.assertEquals(created_image.name, retrieved_image.name)
363 self.assertEquals(created_image.id, retrieved_image.id)
365 def test_create_delete_image(self):
367 Tests the creation then deletion of an OpenStack image to ensure clean() does not raise an Exception.
370 # Set the default image settings, then set any custom parameters sent from the app
371 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
372 if self.image_metadata:
373 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
374 os_image_settings.url = self.image_metadata['disk_url']
375 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
376 os_image_settings.extra_properties = self.image_metadata['extra_properties']
378 # If this is a 3-part image create the kernel and ramdisk images first
379 if self.image_metadata:
380 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
381 kernel_image_settings = openstack_tests.cirros_url_image(
382 name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
383 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
384 kernel_image = self.image_creators[-1].create()
385 self.assertIsNotNone(kernel_image)
386 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
387 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
389 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
390 ramdisk_image_settings = openstack_tests.cirros_url_image(
391 name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
392 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
393 ramdisk_image = self.image_creators[-1].create()
394 self.assertIsNotNone(ramdisk_image)
395 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
396 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
398 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
399 created_image = self.image_creators[-1].create()
400 self.assertIsNotNone(created_image)
402 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
403 self.assertIsNotNone(retrieved_image)
404 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
405 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
407 # Delete Image manually
408 glance_utils.delete_image(self.glance, created_image)
410 self.assertIsNone(glance_utils.get_image(self.glance, self.image_creators[-1].image_settings.name))
412 # Must not throw an exception when attempting to cleanup non-existent image
413 self.image_creators[-1].clean()
414 self.assertIsNone(self.image_creators[-1].get_image())
415 self.image_creators.pop()
417 def test_create_same_image(self):
419 Tests the creation of an OpenStack image when the image already exists.
422 # Set the default image settings, then set any custom parameters sent from the app
423 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
424 if self.image_metadata:
425 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
426 os_image_settings.url = self.image_metadata['disk_url']
427 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
428 os_image_settings.extra_properties = self.image_metadata['extra_properties']
430 # If this is a 3-part image create the kernel and ramdisk images first
431 if self.image_metadata:
432 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
433 kernel_image_settings = openstack_tests.cirros_url_image(
434 name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
435 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
436 kernel_image = self.image_creators[-1].create()
437 self.assertIsNotNone(kernel_image)
438 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
439 os_image_settings.extra_properties['kernel_id'] = kernel_image.id
441 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
442 ramdisk_image_settings = openstack_tests.cirros_url_image(
443 name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
444 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
445 ramdisk_image = self.image_creators[-1].create()
446 self.assertIsNotNone(ramdisk_image)
447 os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
449 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
450 image1 = self.image_creators[-1].create()
452 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
453 self.assertIsNotNone(retrieved_image)
454 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
455 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
456 self.assertEquals(image1.name, retrieved_image.name)
457 self.assertEquals(image1.id, retrieved_image.id)
458 self.assertEquals(image1.properties, retrieved_image.properties)
460 # Should be retrieving the instance data
461 os_image_2 = create_image.OpenStackImage(self.os_creds, os_image_settings)
462 image2 = os_image_2.create()
463 self.assertEquals(image1.id, image2.id)
466 class CreateImageNegativeTests(OSIntegrationTestCase):
468 Negative test cases for the CreateImage class
472 super(self.__class__, self).__start__()
474 self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
475 self.image_creator = None
478 if self.image_creator:
479 self.image_creator.clean()
481 super(self.__class__, self).__clean__()
483 def test_none_image_name(self):
485 Expect an exception when the image name is None
487 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
488 with self.assertRaises(Exception):
489 self.image_creator = create_image.OpenStackImage(
490 self.os_creds, create_image.ImageSettings(
491 name=None, image_user=os_image_settings.image_user, img_format=os_image_settings.format,
492 url=os_image_settings.url))
494 self.fail('Exception should have been thrown prior to this line')
496 def test_bad_image_url(self):
498 Expect an exception when the image download url is bad
500 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
501 self.image_creator = create_image.OpenStackImage(self.os_creds, create_image.ImageSettings(
502 name=os_image_settings.name, image_user=os_image_settings.image_user,
503 img_format=os_image_settings.format, url="http://foo.bar"))
504 with self.assertRaises(Exception):
505 self.image_creator.create()
507 def test_bad_image_file(self):
509 Expect an exception when the image file does not exist
511 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
512 self.image_creator = create_image.OpenStackImage(
514 create_image.ImageSettings(name=os_image_settings.name, image_user=os_image_settings.image_user,
515 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
516 with self.assertRaises(Exception):
517 self.image_creator.create()
519 def test_none_proj_name(self):
521 Expect an exception when the project name is None
523 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
524 with self.assertRaises(Exception):
525 self.image_creator = create_image.OpenStackImage(
526 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, self.os_creds.auth_url, None,
527 proxy_settings=self.os_creds.proxy_settings),
529 self.image_creator.create()
531 def test_none_auth_url(self):
533 Expect an exception when the project name is None
535 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
536 with self.assertRaises(Exception):
537 self.image_creator = create_image.OpenStackImage(
538 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, None,
539 self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
541 self.image_creator.create()
543 def test_none_password(self):
545 Expect an exception when the project name is None
547 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
548 with self.assertRaises(Exception):
549 self.image_creator = create_image.OpenStackImage(
550 os_credentials.OSCreds(self.os_creds.username, None, self.os_creds.os_auth_url,
551 self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
554 def test_none_user(self):
556 Expect an exception when the project name is None
558 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
559 with self.assertRaises(Exception):
560 self.image_creator = create_image.OpenStackImage(
561 os_credentials.OSCreds(None, self.os_creds.password, self.os_creds.os_auth_url,
562 self.os_creds.project_name,
563 proxy_settings=self.os_creds.proxy_settings),
567 class CreateMultiPartImageTests(OSIntegrationTestCase):
569 Test for creating a 3-part image
573 Instantiates the CreateImage object that is responsible for
574 downloading and creating an OS image file within OpenStack
576 super(self.__class__, self).__start__()
579 self.image_creators = list()
580 self.image_name = self.__class__.__name__ + '-' + str(guid)
581 self.glance = glance_utils.glance_client(self.os_creds)
583 self.tmp_dir = 'tmp/' + str(guid)
584 if not os.path.exists(self.tmp_dir):
585 os.makedirs(self.tmp_dir)
589 Cleans the images and downloaded image file
591 while self.image_creators:
592 self.image_creators[-1].clean()
593 self.image_creators.pop()
595 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
596 shutil.rmtree(self.tmp_dir)
598 super(self.__class__, self).__clean__()
600 def test_create_three_part_image_from_url(self):
602 Tests the creation of a 3-part OpenStack image from a URL.
606 if self.image_metadata:
607 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
608 properties = self.image_metadata['extra_properties']
610 # Create the kernel image
611 kernel_image_settings = openstack_tests.cirros_url_image(
612 name=self.image_name+'_kernel',
613 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel')
615 if self.image_metadata:
616 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
617 kernel_image_settings.url = self.image_metadata['kernel_url']
618 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
619 kernel_image = self.image_creators[-1].create()
620 self.assertIsNotNone(kernel_image)
621 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
623 # Create the ramdisk image
624 ramdisk_image_settings = openstack_tests.cirros_url_image(
625 name=self.image_name+'_ramdisk',
626 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs')
627 if self.image_metadata:
628 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
629 ramdisk_image_settings.url = self.image_metadata['ramdisk_url']
630 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
631 ramdisk_image = self.image_creators[-1].create()
632 self.assertIsNotNone(ramdisk_image)
633 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
635 # Create the main image
636 os_image_settings = openstack_tests.cirros_url_image(
637 name=self.image_name,
638 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img')
639 if self.image_metadata:
640 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
641 os_image_settings.url = self.image_metadata['disk_url']
643 properties['kernel_id'] = kernel_image.id
644 properties['ramdisk_id'] = ramdisk_image.id
645 os_image_settings.extra_properties = properties
647 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
648 created_image = self.image_creators[-1].create()
649 self.assertIsNotNone(created_image)
650 self.assertEqual(self.image_name, created_image.name)
652 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
653 self.assertIsNotNone(retrieved_image)
654 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
655 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
656 self.assertEquals(created_image.name, retrieved_image.name)
657 self.assertEquals(created_image.id, retrieved_image.id)
658 self.assertEquals(created_image.properties, retrieved_image.properties)
660 def test_create_three_part_image_from_file(self):
662 Tests the creation of a 3-part OpenStack image from files.
666 if self.image_metadata:
667 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
668 properties = self.image_metadata['extra_properties']
669 # Create the kernel image
670 kernel_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel'
671 if self.image_metadata:
672 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
673 kernel_url = self.image_metadata['kernel_url']
674 kernel_image_file = file_utils.download(kernel_url, self.tmp_dir)
675 kernel_file_image_settings = openstack_tests.file_image_test_settings(
676 name=self.image_name+'_kernel', file_path=kernel_image_file.name)
677 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
678 kernel_image = self.image_creators[-1].create()
679 self.assertIsNotNone(kernel_image)
680 self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
682 # Create the ramdisk image
683 ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
684 if self.image_metadata:
685 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
686 ramdisk_url = self.image_metadata['ramdisk_url']
687 ramdisk_image_file = file_utils.download(ramdisk_url, self.tmp_dir)
688 ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
689 name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
690 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
691 ramdisk_image = self.image_creators[-1].create()
692 self.assertIsNotNone(ramdisk_image)
693 self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
695 # Create the main image
696 image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
697 image_file = file_utils.download(image_url, self.tmp_dir)
698 file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
699 properties['kernel_id'] = kernel_image.id
700 properties['ramdisk_id'] = ramdisk_image.id
701 file_image_settings.extra_properties = properties
702 self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
703 created_image = self.image_creators[-1].create()
705 self.assertIsNotNone(created_image)
706 self.assertEqual(self.image_name, created_image.name)
708 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
709 self.assertIsNotNone(retrieved_image)
711 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
712 self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
714 self.assertEquals(created_image.name, retrieved_image.name)
715 self.assertEquals(created_image.id, retrieved_image.id)
716 self.assertEquals(created_image.properties, retrieved_image.properties)
719 def get_image_size(image_settings):
721 Returns the expected image size
724 if image_settings.image_file:
725 return os.path.getsize(image_settings.image_file)
726 elif image_settings.url:
727 return int(file_utils.get_content_length(image_settings.url))
729 raise Exception('Cannot retrieve expected image size. Image filename or URL has not been configured')