1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
20 from snaps import file_utils
21 from snaps.openstack.create_image import ImageSettings
23 import openstack_tests
24 from snaps.openstack.utils import glance_utils
25 from snaps.openstack import create_image
26 from snaps.openstack import os_credentials
27 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
29 __author__ = 'spisarski'
32 class ImageSettingsUnitTests(unittest.TestCase):
34 Tests the construction of the ImageSettings class
37 def test_no_params(self):
38 with self.assertRaises(Exception):
41 def test_empty_config(self):
42 with self.assertRaises(Exception):
43 ImageSettings(config=dict())
45 def test_name_only(self):
46 with self.assertRaises(Exception):
47 ImageSettings(name='foo')
49 def test_config_with_name_only(self):
50 with self.assertRaises(Exception):
51 ImageSettings(config={'name': 'foo'})
53 def test_name_user_only(self):
54 with self.assertRaises(Exception):
55 ImageSettings(name='foo', image_user='bar')
57 def test_config_with_name_user_only(self):
58 with self.assertRaises(Exception):
59 ImageSettings(config={'name': 'foo', 'image_user': 'bar'})
61 def test_name_user_format_only(self):
62 with self.assertRaises(Exception):
63 ImageSettings(name='foo', image_user='bar', img_format='qcow2')
65 def test_config_with_name_user_format_only(self):
66 with self.assertRaises(Exception):
67 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
69 def test_name_user_format_url_file_only(self):
70 with self.assertRaises(Exception):
71 ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
72 image_file='/foo/bar.qcow')
74 def test_config_with_name_user_format_url_file_only(self):
75 with self.assertRaises(Exception):
76 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
77 'download_url': 'http://foo.com', 'image_file': '/foo/bar.qcow'})
79 def test_name_user_format_url_only(self):
80 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
81 self.assertEquals('foo', settings.name)
82 self.assertEquals('bar', settings.image_user)
83 self.assertEquals('qcow2', settings.format)
84 self.assertEquals('http://foo.com', settings.url)
85 self.assertIsNone(settings.image_file)
86 self.assertIsNone(settings.nic_config_pb_loc)
88 def test_name_user_format_url_only_properties(self):
89 properties = {'hw_video_model': 'vga'}
90 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
91 extra_properties=properties)
92 self.assertEquals('foo', settings.name)
93 self.assertEquals('bar', settings.image_user)
94 self.assertEquals('qcow2', settings.format)
95 self.assertEquals('http://foo.com', settings.url)
96 self.assertEquals(properties, settings.extra_properties)
97 self.assertIsNone(settings.image_file)
98 self.assertIsNone(settings.nic_config_pb_loc)
100 def test_config_with_name_user_format_url_only(self):
101 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
102 'download_url': 'http://foo.com'})
103 self.assertEquals('foo', settings.name)
104 self.assertEquals('bar', settings.image_user)
105 self.assertEquals('qcow2', settings.format)
106 self.assertEquals('http://foo.com', settings.url)
107 self.assertIsNone(settings.image_file)
108 self.assertIsNone(settings.nic_config_pb_loc)
110 def test_name_user_format_file_only(self):
111 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
112 self.assertEquals('foo', settings.name)
113 self.assertEquals('bar', settings.image_user)
114 self.assertEquals('qcow2', settings.format)
115 self.assertIsNone(settings.url)
116 self.assertEquals('/foo/bar.qcow', settings.image_file)
117 self.assertIsNone(settings.nic_config_pb_loc)
119 def test_config_with_name_user_format_file_only(self):
120 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
121 'image_file': '/foo/bar.qcow'})
122 self.assertEquals('foo', settings.name)
123 self.assertEquals('bar', settings.image_user)
124 self.assertEquals('qcow2', settings.format)
125 self.assertIsNone(settings.url)
126 self.assertEquals('/foo/bar.qcow', settings.image_file)
127 self.assertIsNone(settings.nic_config_pb_loc)
129 def test_all_url(self):
130 properties = {'hw_video_model': 'vga'}
131 kernel_settings = ImageSettings(name='kernel', url='http://kernel.com', image_user='bar', img_format='qcow2')
132 ramdisk_settings = ImageSettings(name='ramdisk', url='http://ramdisk.com', image_user='bar', img_format='qcow2')
133 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
134 extra_properties=properties, nic_config_pb_loc='/foo/bar',
135 kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings)
136 self.assertEquals('foo', settings.name)
137 self.assertEquals('bar', settings.image_user)
138 self.assertEquals('qcow2', settings.format)
139 self.assertEquals('http://foo.com', settings.url)
140 self.assertEquals(properties, settings.extra_properties)
141 self.assertIsNone(settings.image_file)
142 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
143 self.assertEquals('kernel', settings.kernel_image_settings.name)
144 self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
145 self.assertEquals('bar', settings.kernel_image_settings.image_user)
146 self.assertEquals('qcow2', settings.kernel_image_settings.format)
147 self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
148 self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
149 self.assertEquals('bar', settings.ramdisk_image_settings.image_user)
150 self.assertEquals('qcow2', settings.ramdisk_image_settings.format)
152 def test_config_all_url(self):
153 settings = ImageSettings(
154 config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
155 'download_url': 'http://foo.com',
156 'extra_properties': '{\'hw_video_model\': \'vga\'}',
157 'nic_config_pb_loc': '/foo/bar',
158 'kernel_image_settings': {'name': 'kernel', 'download_url': 'http://kernel.com',
159 'image_user': 'bar', 'format': 'qcow2'},
160 'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com',
161 'image_user': 'bar', 'format': 'qcow2'}})
162 self.assertEquals('foo', settings.name)
163 self.assertEquals('bar', settings.image_user)
164 self.assertEquals('qcow2', settings.format)
165 self.assertEquals('http://foo.com', settings.url)
166 self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
167 self.assertIsNone(settings.image_file)
168 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
169 self.assertEquals('kernel', settings.kernel_image_settings.name)
170 self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
171 self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
172 self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
174 def test_all_file(self):
175 properties = {'hw_video_model': 'vga'}
176 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
177 extra_properties=properties, nic_config_pb_loc='/foo/bar')
178 self.assertEquals('foo', settings.name)
179 self.assertEquals('bar', settings.image_user)
180 self.assertEquals('qcow2', settings.format)
181 self.assertIsNone(settings.url)
182 self.assertEquals('/foo/bar.qcow', settings.image_file)
183 self.assertEquals(properties, settings.extra_properties)
184 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
186 def test_config_all_file(self):
187 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
188 'image_file': '/foo/bar.qcow',
189 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
190 'nic_config_pb_loc': '/foo/bar'})
191 self.assertEquals('foo', settings.name)
192 self.assertEquals('bar', settings.image_user)
193 self.assertEquals('qcow2', settings.format)
194 self.assertIsNone(settings.url)
195 self.assertEquals('/foo/bar.qcow', settings.image_file)
196 self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
197 self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
200 class CreateImageSuccessTests(OSIntegrationTestCase):
202 Test for the CreateImage class defined in create_image.py
207 Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
210 super(self.__class__, self).__start__()
213 self.image_name = self.__class__.__name__ + '-' + str(guid)
214 self.glance = glance_utils.glance_client(self.os_creds)
215 self.image_creator = None
217 self.tmp_dir = 'tmp/' + str(guid)
218 if not os.path.exists(self.tmp_dir):
219 os.makedirs(self.tmp_dir)
223 Cleans the image and downloaded image file
225 if self.image_creator:
226 self.image_creator.clean()
228 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
229 shutil.rmtree(self.tmp_dir)
231 super(self.__class__, self).__clean__()
233 def test_create_image_clean_url(self):
235 Tests the creation of an OpenStack image from a URL.
238 # Set the default image settings, then set any custom parameters sent from the app
239 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name, image_metadata=self.image_metadata)
241 self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
242 created_image = self.image_creator.create()
243 self.assertIsNotNone(created_image)
245 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
246 self.assertIsNotNone(retrieved_image)
247 self.assertEquals(created_image.size, retrieved_image.size)
248 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
249 self.assertEquals(created_image.name, retrieved_image.name)
250 self.assertEquals(created_image.id, retrieved_image.id)
252 def test_create_image_clean_url_properties(self):
254 Tests the creation of an OpenStack image from a URL and set properties.
257 # Set the default image settings, then set any custom parameters sent from the app
258 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name, image_metadata=self.image_metadata)
260 self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
261 created_image = self.image_creator.create()
262 self.assertIsNotNone(created_image)
264 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
265 self.assertIsNotNone(retrieved_image)
266 self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
267 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
268 self.assertEquals(created_image.name, retrieved_image.name)
269 self.assertEquals(created_image.id, retrieved_image.id)
270 self.assertEquals(created_image.properties, retrieved_image.properties)
272 def test_create_image_clean_file(self):
274 Tests the creation of an OpenStack image from a file.
278 # Set the default image settings, then set any custom parameters sent from the app
279 url_image_settings = openstack_tests.cirros_url_image(self.image_name, image_metadata=self.image_metadata)
281 # Download the file of the image
282 image_file = file_utils.download(url_image_settings.url, self.tmp_dir)
283 file_image_settings = openstack_tests.file_image_test_settings(
284 name=self.image_name, file_path=image_file.name, image_metadata=self.image_metadata)
286 self.image_creator = create_image.OpenStackImage(self.os_creds, file_image_settings)
287 created_image = self.image_creator.create()
288 self.assertIsNotNone(created_image)
289 self.assertEqual(self.image_name, created_image.name)
291 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
292 self.assertIsNotNone(retrieved_image)
293 self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
294 self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
296 self.assertEquals(created_image.name, retrieved_image.name)
297 self.assertEquals(created_image.id, retrieved_image.id)
299 def test_create_delete_image(self):
301 Tests the creation then deletion of an OpenStack image to ensure clean() does not raise an Exception.
304 # Set the default image settings, then set any custom parameters sent from the app
305 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name, image_metadata=self.image_metadata)
307 self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
308 created_image = self.image_creator.create()
309 self.assertIsNotNone(created_image)
311 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
312 self.assertIsNotNone(retrieved_image)
313 self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
314 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
316 # Delete Image manually
317 glance_utils.delete_image(self.glance, created_image)
319 self.assertIsNone(glance_utils.get_image(self.glance, self.image_creator.image_settings.name))
321 # Must not throw an exception when attempting to cleanup non-existent image
322 self.image_creator.clean()
323 self.assertIsNone(self.image_creator.get_image())
325 def test_create_same_image(self):
327 Tests the creation of an OpenStack image when the image already exists.
330 # Set the default image settings, then set any custom parameters sent from the app
331 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name, image_metadata=self.image_metadata)
333 self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
334 image1 = self.image_creator.create()
336 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
337 self.assertIsNotNone(retrieved_image)
338 self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
339 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
340 self.assertEquals(image1.name, retrieved_image.name)
341 self.assertEquals(image1.id, retrieved_image.id)
342 self.assertEquals(image1.properties, retrieved_image.properties)
344 # Should be retrieving the instance data
345 os_image_2 = create_image.OpenStackImage(self.os_creds, os_image_settings)
346 image2 = os_image_2.create()
347 self.assertEquals(image1.id, image2.id)
350 class CreateImageNegativeTests(OSIntegrationTestCase):
352 Negative test cases for the CreateImage class
356 super(self.__class__, self).__start__()
358 self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
359 self.image_creator = None
362 if self.image_creator:
363 self.image_creator.clean()
365 super(self.__class__, self).__clean__()
367 def test_none_image_name(self):
369 Expect an exception when the image name is None
371 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
372 with self.assertRaises(Exception):
373 self.image_creator = create_image.OpenStackImage(
374 self.os_creds, create_image.ImageSettings(
375 name=None, image_user=os_image_settings.image_user, img_format=os_image_settings.format,
376 url=os_image_settings.url))
378 self.fail('Exception should have been thrown prior to this line')
380 def test_bad_image_url(self):
382 Expect an exception when the image download url is bad
384 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
385 self.image_creator = create_image.OpenStackImage(self.os_creds, create_image.ImageSettings(
386 name=os_image_settings.name, image_user=os_image_settings.image_user,
387 img_format=os_image_settings.format, url="http://foo.bar"))
388 with self.assertRaises(Exception):
389 self.image_creator.create()
391 def test_bad_image_file(self):
393 Expect an exception when the image file does not exist
395 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
396 self.image_creator = create_image.OpenStackImage(
398 create_image.ImageSettings(name=os_image_settings.name, image_user=os_image_settings.image_user,
399 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
400 with self.assertRaises(Exception):
401 self.image_creator.create()
403 def test_none_proj_name(self):
405 Expect an exception when the project name is None
407 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
408 with self.assertRaises(Exception):
409 self.image_creator = create_image.OpenStackImage(
410 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, self.os_creds.auth_url, None,
411 proxy_settings=self.os_creds.proxy_settings),
413 self.image_creator.create()
415 def test_none_auth_url(self):
417 Expect an exception when the project name is None
419 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
420 with self.assertRaises(Exception):
421 self.image_creator = create_image.OpenStackImage(
422 os_credentials.OSCreds(self.os_creds.username, self.os_creds.password, None,
423 self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
425 self.image_creator.create()
427 def test_none_password(self):
429 Expect an exception when the project name is None
431 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
432 with self.assertRaises(Exception):
433 self.image_creator = create_image.OpenStackImage(
434 os_credentials.OSCreds(self.os_creds.username, None, self.os_creds.os_auth_url,
435 self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings),
438 def test_none_user(self):
440 Expect an exception when the project name is None
442 os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
443 with self.assertRaises(Exception):
444 self.image_creator = create_image.OpenStackImage(
445 os_credentials.OSCreds(None, self.os_creds.password, self.os_creds.os_auth_url,
446 self.os_creds.project_name,
447 proxy_settings=self.os_creds.proxy_settings),
451 class CreateMultiPartImageTests(OSIntegrationTestCase):
453 Test different means for creating a 3-part images
457 Instantiates the CreateImage object that is responsible for
458 downloading and creating an OS image file within OpenStack
460 super(self.__class__, self).__start__()
463 self.image_creators = list()
464 self.image_name = self.__class__.__name__ + '-' + str(guid)
465 self.glance = glance_utils.glance_client(self.os_creds)
467 self.tmp_dir = 'tmp/' + str(guid)
468 if not os.path.exists(self.tmp_dir):
469 os.makedirs(self.tmp_dir)
473 Cleans the images and downloaded image file
475 for image_creator in self.image_creators:
476 image_creator.clean()
478 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
479 shutil.rmtree(self.tmp_dir)
481 super(self.__class__, self).__clean__()
483 def test_create_three_part_image_from_url(self):
485 Tests the creation of a 3-part OpenStack image from a URL.
487 # Create the kernel image
488 image_settings = openstack_tests.cirros_url_image(
489 name=self.image_name,
490 image_metadata={'disk_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img',
491 'kernel_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel',
492 'ramdisk_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'})
494 image_creator = create_image.OpenStackImage(self.os_creds, image_settings)
495 self.image_creators.append(image_creator)
496 image_creator.create()
498 main_image = glance_utils.get_image(self.glance, image_settings.name)
499 self.assertIsNotNone(main_image)
500 self.assertIsNotNone(image_creator.get_image())
501 self.assertEquals(image_creator.get_image().id, main_image.id)
503 kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name)
504 self.assertIsNotNone(kernel_image)
505 self.assertIsNotNone(image_creator.get_kernel_image())
506 self.assertEquals(kernel_image.id, image_creator.get_kernel_image().id)
508 ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name)
509 self.assertIsNotNone(ramdisk_image)
510 self.assertIsNotNone(image_creator.get_ramdisk_image())
511 self.assertEquals(ramdisk_image.id, image_creator.get_ramdisk_image().id)
513 def test_create_three_part_image_from_file_3_creators(self):
515 Tests the creation of a 3-part OpenStack image from files.
519 if self.image_metadata:
520 if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
521 properties = self.image_metadata['extra_properties']
522 # Create the kernel image
523 kernel_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel'
524 if self.image_metadata:
525 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
526 kernel_url = self.image_metadata['kernel_url']
527 kernel_image_file = file_utils.download(kernel_url, self.tmp_dir)
528 kernel_file_image_settings = openstack_tests.file_image_test_settings(
529 name=self.image_name+'_kernel', file_path=kernel_image_file.name)
530 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
531 kernel_image = self.image_creators[-1].create()
532 self.assertIsNotNone(kernel_image)
533 self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
535 # Create the ramdisk image
536 ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
537 if self.image_metadata:
538 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
539 ramdisk_url = self.image_metadata['ramdisk_url']
540 ramdisk_image_file = file_utils.download(ramdisk_url, self.tmp_dir)
541 ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
542 name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
543 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
544 ramdisk_image = self.image_creators[-1].create()
545 self.assertIsNotNone(ramdisk_image)
546 self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
548 # Create the main image
549 image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
550 image_file = file_utils.download(image_url, self.tmp_dir)
551 file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
552 properties['kernel_id'] = kernel_image.id
553 properties['ramdisk_id'] = ramdisk_image.id
554 file_image_settings.extra_properties = properties
555 self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
556 created_image = self.image_creators[-1].create()
558 self.assertIsNotNone(created_image)
559 self.assertEqual(self.image_name, created_image.name)
561 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
562 self.assertIsNotNone(retrieved_image)
563 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
564 self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
565 self.assertEquals(created_image.name, retrieved_image.name)
566 self.assertEquals(created_image.id, retrieved_image.id)
567 self.assertEquals(created_image.properties, retrieved_image.properties)
569 def test_create_three_part_image_from_url_3_creators(self):
571 Tests the creation of a 3-part OpenStack image from a URL.
575 if self.image_metadata and self.image_metadata['extra_properties']:
576 properties = self.image_metadata['extra_properties']
578 # Create the kernel image
579 kernel_image_settings = openstack_tests.cirros_url_image(
580 name=self.image_name+'_kernel',
581 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel')
583 if self.image_metadata:
584 if 'kernel_url' in self.image_metadata and self.image_metadata['kernel_url']:
585 kernel_image_settings.url = self.image_metadata['kernel_url']
586 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
587 kernel_image = self.image_creators[-1].create()
588 self.assertIsNotNone(kernel_image)
589 self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
591 # Create the ramdisk image
592 ramdisk_image_settings = openstack_tests.cirros_url_image(
593 name=self.image_name+'_ramdisk',
594 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs')
595 if self.image_metadata:
596 if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
597 ramdisk_image_settings.url = self.image_metadata['ramdisk_url']
598 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
599 ramdisk_image = self.image_creators[-1].create()
600 self.assertIsNotNone(ramdisk_image)
601 self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
603 # Create the main image
604 os_image_settings = openstack_tests.cirros_url_image(
605 name=self.image_name,
606 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img')
607 if self.image_metadata:
608 if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
609 os_image_settings.url = self.image_metadata['disk_url']
611 properties['kernel_id'] = kernel_image.id
612 properties['ramdisk_id'] = ramdisk_image.id
613 os_image_settings.extra_properties = properties
615 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
616 created_image = self.image_creators[-1].create()
617 self.assertIsNotNone(created_image)
618 self.assertEqual(self.image_name, created_image.name)
620 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
621 self.assertIsNotNone(retrieved_image)
623 self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
624 self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
626 self.assertEquals(created_image.name, retrieved_image.name)
627 self.assertEquals(created_image.id, retrieved_image.id)
628 self.assertEquals(created_image.properties, retrieved_image.properties)
631 def get_image_size(image_settings):
633 Returns the expected image size
636 if image_settings.image_file:
637 return os.path.getsize(image_settings.image_file)
638 elif image_settings.url:
639 return int(file_utils.get_content_length(image_settings.url))
641 raise Exception('Cannot retrieve expected image size. Image filename or URL has not been configured')