1 ##############################################################################
2 # Copyright (c) 2017 Intel Corporation
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
14 from yardstick.common import exceptions
15 from yardstick.common import kubernetes_utils
16 from yardstick.orchestrator import kubernetes
17 from yardstick.tests.unit import base
20 class GetTemplateTestCase(base.BaseUnitTestCase):
22 def test_get_template(self):
25 "kind": "ReplicationController",
27 "name": "host-k8s-86096c30"
34 "app": "host-k8s-86096c30"
42 "chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
43 service ssh restart;while true ; do sleep 10000; done"
48 "image": "openretriever/yardstick",
49 "name": "host-k8s-86096c30-container",
52 "mountPath": "/tmp/.ssh/",
53 "name": "k8s-86096c30-key",
62 "name": "k8s-86096c30-key"
64 "name": "k8s-86096c30-key"
68 "kubernetes.io/hostname": "node-01"
70 "restartPolicy": "Always",
72 {"operator": "Exists"}
79 'command': '/bin/bash',
80 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
81 service ssh restart;while true ; do sleep 10000; done'],
82 'ssh_key': 'k8s-86096c30-key',
83 'nodeSelector': {'kubernetes.io/hostname': 'node-01'},
85 'restartPolicy': 'Always'
87 name = 'host-k8s-86096c30'
88 output_r = kubernetes.ReplicationControllerObject(
89 name, **input_s).get_template()
90 self.assertEqual(output_r, output_t)
92 def test_get_template_invalid_restart_policy(self):
93 input_s = {'restartPolicy': 'invalid_option'}
94 name = 'host-k8s-86096c30'
95 with self.assertRaises(exceptions.KubernetesWrongRestartPolicy):
96 kubernetes.ReplicationControllerObject(
97 name, **input_s).get_template()
100 class GetRcPodsTestCase(base.BaseUnitTestCase):
102 @mock.patch('yardstick.orchestrator.kubernetes.k8s_utils.get_pod_list')
103 def test_get_rc_pods(self, mock_get_pod_list):
106 'image': 'openretriever/yardstick',
107 'command': '/bin/bash',
108 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
109 service ssh restart;while true ; do sleep 10000; done']
112 'image': 'openretriever/yardstick',
113 'command': '/bin/bash',
114 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
115 service ssh restart;while true ; do sleep 10000; done']
118 k8s_template = kubernetes.KubernetesTemplate('k8s-86096c30', servers)
119 mock_get_pod_list.return_value.items = []
120 pods = k8s_template.get_rc_pods()
121 self.assertEqual(pods, [])
124 class ReplicationControllerObjectTestCase(base.BaseUnitTestCase):
126 def test__init_one_container(self):
127 pod_name = 'pod_name'
128 _kwargs = {'args': ['arg1', 'arg2'],
129 'image': 'fake_image',
130 'command': 'fake_command'}
131 k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
132 self.assertEqual(1, len(k8s_obj._containers))
133 container = k8s_obj._containers[0]
134 self.assertEqual(['arg1', 'arg2'], container._args)
135 self.assertEqual('fake_image', container._image)
136 self.assertEqual(['fake_command'], container._command)
137 self.assertEqual([], container._volume_mounts)
139 def test__init_multipe_containers(self):
140 pod_name = 'pod_name'
143 containers.append({'args': ['arg1', 'arg2'],
144 'image': 'fake_image_%s' % i,
145 'command': 'fake_command_%s' % i})
146 _kwargs = {'containers': containers}
147 k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
148 self.assertEqual(5, len(k8s_obj._containers))
150 container = k8s_obj._containers[i]
151 self.assertEqual(['arg1', 'arg2'], container._args)
152 self.assertEqual('fake_image_%s' % i, container._image)
153 self.assertEqual(['fake_command_%s' % i], container._command)
154 self.assertEqual([], container._volume_mounts)
156 def test__add_volumes(self):
157 volume1 = {'name': 'fake_sshkey',
158 'configMap': {'name': 'fake_sshkey'}}
159 volume2 = {'name': 'volume2',
161 k8s_obj = kubernetes.ReplicationControllerObject(
162 'name', ssh_key='fake_sshkey', volumes=[volume2])
163 k8s_obj._add_volumes()
164 volumes = k8s_obj.template['spec']['template']['spec']['volumes']
165 self.assertEqual(sorted([volume1, volume2], key=lambda k: k['name']),
166 sorted(volumes, key=lambda k: k['name']))
168 def test__add_volumes_no_volumes(self):
169 volume1 = {'name': 'fake_sshkey',
170 'configMap': {'name': 'fake_sshkey'}}
171 k8s_obj = kubernetes.ReplicationControllerObject(
172 'name', ssh_key='fake_sshkey')
173 k8s_obj._add_volumes()
174 volumes = k8s_obj.template['spec']['template']['spec']['volumes']
175 self.assertEqual([volume1], volumes)
177 def test__create_ssh_key_volume(self):
178 expected = {'name': 'fake_sshkey',
179 'configMap': {'name': 'fake_sshkey'}}
180 k8s_obj = kubernetes.ReplicationControllerObject(
181 'name', ssh_key='fake_sshkey')
182 self.assertEqual(expected, k8s_obj._create_ssh_key_volume())
184 def test__create_volume_item(self):
185 for vol_type in kubernetes_utils.get_volume_types():
186 volume = {'name': 'vol_name',
190 kubernetes.ReplicationControllerObject.
191 _create_volume_item(volume))
193 def test__create_volume_item_invalid_type(self):
194 volume = {'name': 'vol_name',
195 'invalid_type': 'data'}
196 with self.assertRaises(exceptions.KubernetesTemplateInvalidVolumeType):
197 kubernetes.ReplicationControllerObject._create_volume_item(volume)
199 def test__add_security_context(self):
200 k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
201 self.assertNotIn('securityContext',
202 k8s_obj.template['spec']['template']['spec'])
204 k8s_obj._security_context = {'key_pod': 'value_pod'}
205 k8s_obj._add_security_context()
207 {'key_pod': 'value_pod'},
208 k8s_obj.template['spec']['template']['spec']['securityContext'])
210 def test__add_security_context_by_init(self):
214 {'securityContext': {'key%s' % i: 'value%s' % i}})
215 _kwargs = {'containers': containers,
216 'securityContext': {'key_pod': 'value_pod'}}
217 k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
219 {'key_pod': 'value_pod'},
220 k8s_obj.template['spec']['template']['spec']['securityContext'])
223 k8s_obj.template['spec']['template']['spec']['containers'][i])
224 self.assertEqual({'key%s' % i: 'value%s' % i},
225 container['securityContext'])
227 def test__add_networks(self):
228 k8s_obj = kubernetes.ReplicationControllerObject(
229 'name', networks=['network1', 'network2', 'network3'])
230 k8s_obj._add_networks()
232 template['spec']['template']['metadata']['annotations']['networks']
233 expected = ('[{"name": "network1"}, {"name": "network2"}, '
234 '{"name": "network3"}]')
235 self.assertEqual(expected, networks)
237 def test__add_tolerations(self):
238 _kwargs = {'tolerations': [{'key': 'key1',
241 'operator': 'operator4',
242 'wrong_key': 'error_key'}]
244 k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
245 k8s_obj._add_tolerations()
246 _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
247 self.assertEqual(1, len(_tol))
248 self.assertEqual({'key': 'key1',
251 'operator': 'operator4'},
254 def test__add_tolerations_default(self):
255 k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
256 k8s_obj._add_tolerations()
257 _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
258 self.assertEqual(1, len(_tol))
259 self.assertEqual({'operator': 'Exists'}, _tol[0])
262 class ContainerObjectTestCase(base.BaseUnitTestCase):
264 def test__create_volume_mounts(self):
265 volume_mount = {'name': 'fake_name',
266 'mountPath': 'fake_path'}
267 ssh_vol = {'name': 'fake_ssh_key',
268 'mountPath': kubernetes.ContainerObject.SSH_MOUNT_PATH,
270 expected = copy.deepcopy(volume_mount)
271 expected['readOnly'] = False
272 expected = [expected, ssh_vol]
273 container_obj = kubernetes.ContainerObject(
274 'cname', 'fake_ssh_key', volumeMounts=[volume_mount])
275 output = container_obj._create_volume_mounts()
276 self.assertEqual(expected, output)
278 def test__create_volume_mounts_no_volume_mounts(self):
279 ssh_vol = {'name': 'fake_ssh_key2',
280 'mountPath': kubernetes.ContainerObject.SSH_MOUNT_PATH,
282 container_obj = kubernetes.ContainerObject('name', 'fake_ssh_key2')
283 output = container_obj._create_volume_mounts()
284 self.assertEqual([ssh_vol], output)
286 def test__create_volume_mounts_item(self):
287 volume_mount = {'name': 'fake_name',
288 'mountPath': 'fake_path'}
289 expected = copy.deepcopy(volume_mount)
290 expected['readOnly'] = False
291 output = kubernetes.ContainerObject._create_volume_mounts_item(
293 self.assertEqual(expected, output)
295 def test_get_container_item(self):
296 volume_mount = {'name': 'fake_name',
297 'mountPath': 'fake_path'}
298 args = ['arg1', 'arg2']
299 container_obj = kubernetes.ContainerObject(
300 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
302 expected = {'args': args,
303 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
304 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
305 'name': 'cname-container',
306 'volumeMounts': container_obj._create_volume_mounts()}
307 self.assertEqual(expected, container_obj.get_container_item())
309 def test_get_container_item_with_security_context(self):
310 volume_mount = {'name': 'fake_name',
311 'mountPath': 'fake_path'}
312 args = ['arg1', 'arg2']
313 container_obj = kubernetes.ContainerObject(
314 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
315 args=args, securityContext={'key': 'value'})
316 expected = {'args': args,
317 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
318 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
319 'name': 'cname-container',
320 'volumeMounts': container_obj._create_volume_mounts(),
321 'securityContext': {'key': 'value'}}
322 self.assertEqual(expected, container_obj.get_container_item())
324 def test_get_container_item_with_env(self):
325 volume_mount = {'name': 'fake_name',
326 'mountPath': 'fake_path'}
327 args = ['arg1', 'arg2']
328 container_obj = kubernetes.ContainerObject(
329 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
330 args=args, env=[{'name': 'fake_var_name',
331 'value': 'fake_var_value'}])
332 expected = {'args': args,
333 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
334 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
335 'name': 'cname-container',
336 'volumeMounts': container_obj._create_volume_mounts(),
337 'env': [{'name': 'fake_var_name',
338 'value': 'fake_var_value'}]}
339 self.assertEqual(expected, container_obj.get_container_item())
341 def test_get_container_item_with_ports_multi_parameter(self):
342 volume_mount = {'name': 'fake_name',
343 'mountPath': 'fake_path'}
344 args = ['arg1', 'arg2']
345 container_obj = kubernetes.ContainerObject(
346 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
347 args=args, ports=[{'containerPort': 'fake_port_name',
348 'hostPort': 'fake_host_port',
350 'protocol': 'fake_protocol',
351 'invalid_varible': 'fakeinvalid_varible',
352 'hostIP': 'fake_port_number'}])
353 expected = {'args': args,
355 kubernetes.ContainerObject.COMMAND_DEFAULT],
356 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
357 'name': 'cname-container',
358 'volumeMounts': container_obj._create_volume_mounts(),
359 'ports': [{'containerPort': 'fake_port_name',
360 'hostPort': 'fake_host_port',
362 'protocol': 'fake_protocol',
363 'hostIP': 'fake_port_number'}]}
364 self.assertEqual(expected, container_obj.get_container_item())
366 def test_get_container_item_with_ports_no_container_port(self):
367 with self.assertRaises(exceptions.KubernetesContainerPortNotDefined):
368 volume_mount = {'name': 'fake_name',
369 'mountPath': 'fake_path'}
370 args = ['arg1', 'arg2']
371 container_obj = kubernetes.ContainerObject(
372 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
373 args=args, ports=[{'hostPort': 'fake_host_port',
375 'protocol': 'fake_protocol',
376 'hostIP': 'fake_port_number'}])
377 container_obj.get_container_item()
379 def test_get_container_item_with_resources(self):
380 volume_mount = {'name': 'fake_name',
381 'mountPath': 'fake_path'}
382 args = ['arg1', 'arg2']
383 resources = {'requests': {'key1': 'val1'},
384 'limits': {'key2': 'val2'},
385 'other_key': {'key3': 'val3'}}
386 container_obj = kubernetes.ContainerObject(
387 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
388 args=args, resources=resources)
389 expected = {'args': args,
390 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
391 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
392 'name': 'cname-container',
393 'volumeMounts': container_obj._create_volume_mounts(),
394 'resources': {'requests': {'key1': 'val1'},
395 'limits': {'key2': 'val2'}}}
396 self.assertEqual(expected, container_obj.get_container_item())
398 def test_get_container_item_image_pull_policy(self):
399 container_obj = kubernetes.ContainerObject(
400 'cname', ssh_key='fake_sshkey', imagePullPolicy='Always')
401 expected = {'args': [],
402 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
403 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
404 'name': 'cname-container',
405 'volumeMounts': container_obj._create_volume_mounts(),
406 'imagePullPolicy':'Always'}
407 self.assertEqual(expected, container_obj.get_container_item())
410 class CustomResourceDefinitionObjectTestCase(base.BaseUnitTestCase):
412 def test__init(self):
415 'name': 'newcrds.ctx_name.com'
418 'group': 'ctx_name.com',
421 'names': {'plural': 'newcrds',
422 'singular': 'newcrd',
426 crd_obj = kubernetes.CustomResourceDefinitionObject(
427 'ctx_name', name='newcrd', version='v2', scope='scope')
428 self.assertEqual('newcrds.ctx_name.com', crd_obj._name)
429 self.assertEqual(template, crd_obj._template)
431 def test__init_missing_parameter(self):
432 with self.assertRaises(exceptions.KubernetesCRDObjectDefinitionError):
433 kubernetes.CustomResourceDefinitionObject('ctx_name',
437 class NetworkObjectTestCase(base.BaseUnitTestCase):
440 self.net_obj = kubernetes.NetworkObject(name='fake_name',
441 plugin='fake_plugin',
444 def test__init_missing_parameter(self):
445 with self.assertRaises(
446 exceptions.KubernetesNetworkObjectDefinitionError):
447 kubernetes.NetworkObject('network_name', plugin='plugin')
448 with self.assertRaises(
449 exceptions.KubernetesNetworkObjectDefinitionError):
450 kubernetes.NetworkObject('network_name', args='args')
452 @mock.patch.object(kubernetes_utils, 'get_custom_resource_definition')
453 def test_crd(self, mock_get_crd):
454 mock_crd = mock.Mock()
455 mock_get_crd.return_value = mock_crd
456 net_obj = copy.deepcopy(self.net_obj)
457 self.assertEqual(mock_crd, net_obj.crd)
459 def test_template(self):
460 net_obj = copy.deepcopy(self.net_obj)
461 expected = {'apiVersion': 'group.com/v2',
462 'kind': kubernetes.NetworkObject.KIND,
464 'name': 'fake_name'},
465 'plugin': 'fake_plugin',
468 crd.spec.group = 'group.com'
469 crd.spec.version = 'v2'
471 self.assertEqual(expected, net_obj.template)
473 def test_group(self):
474 net_obj = copy.deepcopy(self.net_obj)
475 net_obj._crd = mock.Mock()
476 net_obj._crd.spec.group = 'fake_group'
477 self.assertEqual('fake_group', net_obj.group)
479 def test_version(self):
480 net_obj = copy.deepcopy(self.net_obj)
481 net_obj._crd = mock.Mock()
482 net_obj._crd.spec.version = 'version_4'
483 self.assertEqual('version_4', net_obj.version)
485 def test_plural(self):
486 net_obj = copy.deepcopy(self.net_obj)
487 net_obj._crd = mock.Mock()
488 net_obj._crd.spec.names.plural = 'name_ending_in_s'
489 self.assertEqual('name_ending_in_s', net_obj.plural)
491 def test_scope(self):
492 net_obj = copy.deepcopy(self.net_obj)
493 net_obj._crd = mock.Mock()
494 net_obj._crd.spec.scope = 'Cluster'
495 self.assertEqual('Cluster', net_obj.scope)
497 @mock.patch.object(kubernetes_utils, 'create_network')
498 def test_create(self, mock_create_network):
499 net_obj = copy.deepcopy(self.net_obj)
500 net_obj._scope = 'scope'
501 net_obj._group = 'group'
502 net_obj._version = 'version'
503 net_obj._plural = 'plural'
504 net_obj._template = 'template'
506 mock_create_network.assert_called_once_with(
507 'scope', 'group', 'version', 'plural', 'template')
509 @mock.patch.object(kubernetes_utils, 'delete_network')
510 def test_delete(self, mock_delete_network):
511 net_obj = copy.deepcopy(self.net_obj)
512 net_obj._scope = 'scope'
513 net_obj._group = 'group'
514 net_obj._version = 'version'
515 net_obj._plural = 'plural'
516 net_obj._name = 'name'
518 mock_delete_network.assert_called_once_with(
519 'scope', 'group', 'version', 'plural', 'name')
522 class ServiceNodePortObjectTestCase(base.BaseUnitTestCase):
524 def test__init(self):
525 with mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port') \
527 kubernetes.ServiceNodePortObject(
528 'fake_name', node_ports=[{'port': 80, 'name': 'web'}])
530 mock_add_port.assert_has_calls([mock.call(22, 'ssh', protocol='TCP'),
531 mock.call(80, 'web')])
533 @mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port')
534 def test__init_missing_mandatory_parameters(self, *args):
535 with self.assertRaises(
536 exceptions.KubernetesServiceObjectDefinitionError):
537 kubernetes.ServiceNodePortObject(
538 'fake_name', node_ports=[{'port': 80}])
539 with self.assertRaises(
540 exceptions.KubernetesServiceObjectDefinitionError):
541 kubernetes.ServiceNodePortObject(
542 'fake_name', node_ports=[{'name': 'web'}])
544 @mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port')
545 def test__init_missing_bad_name(self, *args):
546 with self.assertRaises(
547 exceptions.KubernetesServiceObjectNameError):
548 kubernetes.ServiceNodePortObject(
549 'fake_name', node_ports=[{'port': 80, 'name': '-web'}])
550 with self.assertRaises(
551 exceptions.KubernetesServiceObjectNameError):
552 kubernetes.ServiceNodePortObject(
553 'fake_name', node_ports=[{'port': 80, 'name': 'Web'}])
554 with self.assertRaises(
555 exceptions.KubernetesServiceObjectNameError):
556 kubernetes.ServiceNodePortObject(
557 'fake_name', node_ports=[{'port': 80, 'name': 'web-'}])
559 def test__add_port(self):
560 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
561 port_ssh = {'name': 'ssh',
564 port_definition = {'port': 80,
569 port = copy.deepcopy(port_definition)
570 _port = port.pop('port')
571 name = port.pop('name')
572 nodeport_object._add_port(_port, name, **port)
573 self.assertEqual([port_ssh, port_definition],
574 nodeport_object.template['spec']['ports'])
576 @mock.patch.object(kubernetes_utils, 'create_service')
577 def test_create(self, mock_create_service):
578 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
579 nodeport_object.template = 'fake_template'
580 nodeport_object.create()
581 mock_create_service.assert_called_once_with('fake_template')
583 @mock.patch.object(kubernetes_utils, 'delete_service')
584 def test_delete(self, mock_delete_service):
585 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
586 nodeport_object.delete()
587 mock_delete_service.assert_called_once_with('fake_name-service')
590 class KubernetesTemplate(base.BaseUnitTestCase):
592 def test_get_rc_by_name(self):
595 'host1': {'args': 'some data'}
598 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
599 rc = k_template.get_rc_by_name('host1-k8s_name')
600 self.assertTrue(isinstance(rc, kubernetes.ReplicationControllerObject))
602 def test_get_rc_by_name_wrong_name(self):
605 'host1': {'args': 'some data'}
608 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
609 self.assertIsNone(k_template.get_rc_by_name('wrong_host_name'))
611 def test_get_rc_by_name_no_rcs(self):
612 ctx_cfg = {'servers': {}}
613 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
614 self.assertIsNone(k_template.get_rc_by_name('any_host_name'))