1 ##############################################################################
2 # Copyright (c) 2017 Intel Corporation
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
14 from yardstick.common import exceptions
15 from yardstick.common import kubernetes_utils
16 from yardstick.orchestrator import kubernetes
17 from yardstick.tests.unit import base
20 class GetTemplateTestCase(base.BaseUnitTestCase):
22 def test_get_template(self):
25 "kind": "ReplicationController",
27 "name": "host-k8s-86096c30"
34 "app": "host-k8s-86096c30"
42 "chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
43 service ssh restart;while true ; do sleep 10000; done"
48 "image": "openretriever/yardstick",
49 "name": "host-k8s-86096c30-container",
52 "mountPath": "/tmp/.ssh/",
53 "name": "k8s-86096c30-key",
62 "name": "k8s-86096c30-key"
64 "name": "k8s-86096c30-key"
68 "kubernetes.io/hostname": "node-01"
70 "restartPolicy": "Always",
72 {"operator": "Exists"}
79 'command': '/bin/bash',
80 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
81 service ssh restart;while true ; do sleep 10000; done'],
82 'ssh_key': 'k8s-86096c30-key',
83 'nodeSelector': {'kubernetes.io/hostname': 'node-01'},
85 'restartPolicy': 'Always'
87 name = 'host-k8s-86096c30'
88 output_r = kubernetes.ReplicationControllerObject(
89 name, **input_s).get_template()
90 self.assertEqual(output_r, output_t)
92 def test_get_template_invalid_restart_policy(self):
93 input_s = {'restartPolicy': 'invalid_option'}
94 name = 'host-k8s-86096c30'
95 with self.assertRaises(exceptions.KubernetesWrongRestartPolicy):
96 kubernetes.ReplicationControllerObject(
97 name, **input_s).get_template()
100 class GetRcPodsTestCase(base.BaseUnitTestCase):
102 @mock.patch('yardstick.orchestrator.kubernetes.k8s_utils.get_pod_list')
103 def test_get_rc_pods(self, mock_get_pod_list):
106 'image': 'openretriever/yardstick',
107 'command': '/bin/bash',
108 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
109 service ssh restart;while true ; do sleep 10000; done']
112 'image': 'openretriever/yardstick',
113 'command': '/bin/bash',
114 'args': ['-c', 'chmod 700 ~/.ssh; chmod 600 ~/.ssh/*; \
115 service ssh restart;while true ; do sleep 10000; done']
118 k8s_template = kubernetes.KubernetesTemplate('k8s-86096c30', servers)
119 mock_get_pod_list.return_value.items = []
120 pods = k8s_template.get_rc_pods()
121 self.assertEqual(pods, [])
124 class ReplicationControllerObjectTestCase(base.BaseUnitTestCase):
126 def test__init_one_container(self):
127 pod_name = 'pod_name'
128 _kwargs = {'args': ['arg1', 'arg2'],
129 'image': 'fake_image',
130 'command': 'fake_command'}
131 k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
132 self.assertEqual(1, len(k8s_obj._containers))
133 container = k8s_obj._containers[0]
134 self.assertEqual(['arg1', 'arg2'], container._args)
135 self.assertEqual('fake_image', container._image)
136 self.assertEqual(['fake_command'], container._command)
137 self.assertEqual([], container._volume_mounts)
139 def test__init_multipe_containers(self):
140 pod_name = 'pod_name'
143 containers.append({'args': ['arg1', 'arg2'],
144 'image': 'fake_image_%s' % i,
145 'command': 'fake_command_%s' % i})
146 _kwargs = {'containers': containers}
147 k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
148 self.assertEqual(5, len(k8s_obj._containers))
150 container = k8s_obj._containers[i]
151 self.assertEqual(['arg1', 'arg2'], container._args)
152 self.assertEqual('fake_image_%s' % i, container._image)
153 self.assertEqual(['fake_command_%s' % i], container._command)
154 self.assertEqual([], container._volume_mounts)
156 def test__add_volumes(self):
157 volume1 = {'name': 'fake_sshkey',
158 'configMap': {'name': 'fake_sshkey'}}
159 volume2 = {'name': 'volume2',
161 k8s_obj = kubernetes.ReplicationControllerObject(
162 'name', ssh_key='fake_sshkey', volumes=[volume2])
163 k8s_obj._add_volumes()
164 volumes = k8s_obj.template['spec']['template']['spec']['volumes']
165 self.assertEqual(sorted([volume1, volume2], key=lambda k: k['name']),
166 sorted(volumes, key=lambda k: k['name']))
168 def test__add_volumes_no_volumes(self):
169 volume1 = {'name': 'fake_sshkey',
170 'configMap': {'name': 'fake_sshkey'}}
171 k8s_obj = kubernetes.ReplicationControllerObject(
172 'name', ssh_key='fake_sshkey')
173 k8s_obj._add_volumes()
174 volumes = k8s_obj.template['spec']['template']['spec']['volumes']
175 self.assertEqual([volume1], volumes)
177 def test__create_ssh_key_volume(self):
178 expected = {'name': 'fake_sshkey',
179 'configMap': {'name': 'fake_sshkey'}}
180 k8s_obj = kubernetes.ReplicationControllerObject(
181 'name', ssh_key='fake_sshkey')
182 self.assertEqual(expected, k8s_obj._create_ssh_key_volume())
184 def test__create_volume_item(self):
185 for vol_type in kubernetes_utils.get_volume_types():
186 volume = {'name': 'vol_name',
190 kubernetes.ReplicationControllerObject.
191 _create_volume_item(volume))
193 def test__create_volume_item_invalid_type(self):
194 volume = {'name': 'vol_name',
195 'invalid_type': 'data'}
196 with self.assertRaises(exceptions.KubernetesTemplateInvalidVolumeType):
197 kubernetes.ReplicationControllerObject._create_volume_item(volume)
199 def test__add_security_context(self):
200 k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
201 self.assertNotIn('securityContext',
202 k8s_obj.template['spec']['template']['spec'])
204 k8s_obj._security_context = {'key_pod': 'value_pod'}
205 k8s_obj._add_security_context()
207 {'key_pod': 'value_pod'},
208 k8s_obj.template['spec']['template']['spec']['securityContext'])
210 def test__add_security_context_by_init(self):
214 {'securityContext': {'key%s' % i: 'value%s' % i}})
215 _kwargs = {'containers': containers,
216 'securityContext': {'key_pod': 'value_pod'}}
217 k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
219 {'key_pod': 'value_pod'},
220 k8s_obj.template['spec']['template']['spec']['securityContext'])
223 k8s_obj.template['spec']['template']['spec']['containers'][i])
224 self.assertEqual({'key%s' % i: 'value%s' % i},
225 container['securityContext'])
227 def test__add_networks(self):
228 k8s_obj = kubernetes.ReplicationControllerObject(
229 'name', networks=['network1', 'network2', 'network3'])
230 k8s_obj._add_networks()
232 template['spec']['template']['metadata']['annotations']['networks']
233 expected = ('[{"name": "network1"}, {"name": "network2"}, '
234 '{"name": "network3"}]')
235 self.assertEqual(expected, networks)
237 def test__add_tolerations(self):
238 _kwargs = {'tolerations': [{'key': 'key1',
241 'operator': 'operator4',
242 'wrong_key': 'error_key'}]
244 k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
245 k8s_obj._add_tolerations()
246 _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
247 self.assertEqual(1, len(_tol))
248 self.assertEqual({'key': 'key1',
251 'operator': 'operator4'},
254 def test__add_tolerations_default(self):
255 k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
256 k8s_obj._add_tolerations()
257 _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
258 self.assertEqual(1, len(_tol))
259 self.assertEqual({'operator': 'Exists'}, _tol[0])
262 class ContainerObjectTestCase(base.BaseUnitTestCase):
264 def test__create_volume_mounts(self):
265 volume_mount = {'name': 'fake_name',
266 'mountPath': 'fake_path'}
267 ssh_vol = {'name': 'fake_ssh_key',
268 'mountPath': kubernetes.ContainerObject.SSH_MOUNT_PATH,
270 expected = copy.deepcopy(volume_mount)
271 expected['readOnly'] = False
272 expected = [expected, ssh_vol]
273 container_obj = kubernetes.ContainerObject(
274 'cname', 'fake_ssh_key', volumeMounts=[volume_mount])
275 output = container_obj._create_volume_mounts()
276 self.assertEqual(expected, output)
278 def test__create_volume_mounts_no_volume_mounts(self):
279 ssh_vol = {'name': 'fake_ssh_key2',
280 'mountPath': kubernetes.ContainerObject.SSH_MOUNT_PATH,
282 container_obj = kubernetes.ContainerObject('name', 'fake_ssh_key2')
283 output = container_obj._create_volume_mounts()
284 self.assertEqual([ssh_vol], output)
286 def test__create_volume_mounts_item(self):
287 volume_mount = {'name': 'fake_name',
288 'mountPath': 'fake_path'}
289 expected = copy.deepcopy(volume_mount)
290 expected['readOnly'] = False
291 output = kubernetes.ContainerObject._create_volume_mounts_item(
293 self.assertEqual(expected, output)
295 def test_get_container_item(self):
296 volume_mount = {'name': 'fake_name',
297 'mountPath': 'fake_path'}
298 args = ['arg1', 'arg2']
299 container_obj = kubernetes.ContainerObject(
300 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
302 expected = {'args': args,
303 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
304 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
305 'name': 'cname-container',
306 'volumeMounts': container_obj._create_volume_mounts()}
307 self.assertEqual(expected, container_obj.get_container_item())
309 def test_get_container_item_with_security_context(self):
310 volume_mount = {'name': 'fake_name',
311 'mountPath': 'fake_path'}
312 args = ['arg1', 'arg2']
313 container_obj = kubernetes.ContainerObject(
314 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
315 args=args, securityContext={'key': 'value'})
316 expected = {'args': args,
317 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
318 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
319 'name': 'cname-container',
320 'volumeMounts': container_obj._create_volume_mounts(),
321 'securityContext': {'key': 'value'}}
322 self.assertEqual(expected, container_obj.get_container_item())
324 def test_get_container_item_with_env(self):
325 volume_mount = {'name': 'fake_name',
326 'mountPath': 'fake_path'}
327 args = ['arg1', 'arg2']
328 container_obj = kubernetes.ContainerObject(
329 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
330 args=args, env=[{'name': 'fake_var_name',
331 'value': 'fake_var_value'}])
332 expected = {'args': args,
333 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
334 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
335 'name': 'cname-container',
336 'volumeMounts': container_obj._create_volume_mounts(),
337 'env': [{'name': 'fake_var_name',
338 'value': 'fake_var_value'}]}
339 self.assertEqual(expected, container_obj.get_container_item())
341 def test_get_container_item_with_ports_multi_parameter(self):
342 volume_mount = {'name': 'fake_name',
343 'mountPath': 'fake_path'}
344 args = ['arg1', 'arg2']
345 container_obj = kubernetes.ContainerObject(
346 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
347 args=args, ports=[{'containerPort': 'fake_port_name',
348 'hostPort': 'fake_host_port',
350 'protocol': 'fake_protocol',
351 'invalid_varible': 'fakeinvalid_varible',
352 'hostIP': 'fake_port_number'}])
353 expected = {'args': args,
354 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
355 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
356 'name': 'cname-container',
357 'volumeMounts': container_obj._create_volume_mounts(),
358 'ports': [{'containerPort': 'fake_port_name',
359 'hostPort': 'fake_host_port',
361 'protocol': 'fake_protocol',
362 'hostIP': 'fake_port_number'}]}
363 self.assertEqual(expected, container_obj.get_container_item())
365 def test_get_container_item_with_ports_no_container_port(self):
366 with self.assertRaises(exceptions.KubernetesContainerPortNotDefined):
367 volume_mount = {'name': 'fake_name',
368 'mountPath': 'fake_path'}
369 args = ['arg1', 'arg2']
370 container_obj = kubernetes.ContainerObject(
371 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
372 args=args, ports=[{'hostPort': 'fake_host_port',
374 'protocol': 'fake_protocol',
375 'hostIP': 'fake_port_number'}])
376 container_obj.get_container_item()
378 def test_get_container_item_with_resources(self):
379 volume_mount = {'name': 'fake_name',
380 'mountPath': 'fake_path'}
381 args = ['arg1', 'arg2']
382 resources = {'requests': {'key1': 'val1'},
383 'limits': {'key2': 'val2'},
384 'other_key': {'key3': 'val3'}}
385 container_obj = kubernetes.ContainerObject(
386 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
387 args=args, resources=resources)
388 expected = {'args': args,
389 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
390 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
391 'name': 'cname-container',
392 'volumeMounts': container_obj._create_volume_mounts(),
393 'resources': {'requests': {'key1': 'val1'},
394 'limits': {'key2': 'val2'}}}
395 self.assertEqual(expected, container_obj.get_container_item())
397 def test_get_container_item_image_pull_policy(self):
398 container_obj = kubernetes.ContainerObject(
399 'cname', ssh_key='fake_sshkey', imagePullPolicy='Always')
400 expected = {'args': [],
401 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
402 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
403 'name': 'cname-container',
404 'volumeMounts': container_obj._create_volume_mounts(),
405 'imagePullPolicy':'Always'}
406 self.assertEqual(expected, container_obj.get_container_item())
408 def test_get_container_item_with_tty_stdin(self):
409 args = ['arg1', 'arg2']
410 container_obj = kubernetes.ContainerObject(
411 'cname', 'fake_sshkey', args=args, tty=False, stdin=True)
412 expected = {'args': args,
413 'command': kubernetes.ContainerObject.COMMAND_DEFAULT,
414 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
415 'name': 'cname-container',
416 'volumeMounts': container_obj._create_volume_mounts(),
419 self.assertEqual(expected, container_obj.get_container_item())
421 def test__parse_commands_string(self):
422 container_obj = kubernetes.ContainerObject('cname', 'fake_sshkey')
423 self.assertEqual(['fake command'],
424 container_obj._parse_commands('fake command'))
426 def test__parse_commands_list(self):
427 container_obj = kubernetes.ContainerObject('cname', 'fake_sshkey')
428 self.assertEqual(['cmd1', 'cmd2'],
429 container_obj._parse_commands(['cmd1', 'cmd2']))
431 def test__parse_commands_exception(self):
432 container_obj = kubernetes.ContainerObject('cname', 'fake_sshkey')
433 with self.assertRaises(exceptions.KubernetesContainerCommandType):
434 container_obj._parse_commands({})
437 class CustomResourceDefinitionObjectTestCase(base.BaseUnitTestCase):
439 def test__init(self):
442 'name': 'newcrds.ctx_name.com'
445 'group': 'ctx_name.com',
448 'names': {'plural': 'newcrds',
449 'singular': 'newcrd',
453 crd_obj = kubernetes.CustomResourceDefinitionObject(
454 'ctx_name', name='newcrd', version='v2', scope='scope')
455 self.assertEqual('newcrds.ctx_name.com', crd_obj._name)
456 self.assertEqual(template, crd_obj._template)
458 def test__init_missing_parameter(self):
459 with self.assertRaises(exceptions.KubernetesCRDObjectDefinitionError):
460 kubernetes.CustomResourceDefinitionObject('ctx_name',
464 class NetworkObjectTestCase(base.BaseUnitTestCase):
467 self.net_obj = kubernetes.NetworkObject(name='fake_name',
468 plugin='fake_plugin',
471 def test__init_missing_parameter(self):
472 with self.assertRaises(
473 exceptions.KubernetesNetworkObjectDefinitionError):
474 kubernetes.NetworkObject('network_name', plugin='plugin')
475 with self.assertRaises(
476 exceptions.KubernetesNetworkObjectDefinitionError):
477 kubernetes.NetworkObject('network_name', args='args')
479 @mock.patch.object(kubernetes_utils, 'get_custom_resource_definition')
480 def test_crd(self, mock_get_crd):
481 mock_crd = mock.Mock()
482 mock_get_crd.return_value = mock_crd
483 net_obj = copy.deepcopy(self.net_obj)
484 self.assertEqual(mock_crd, net_obj.crd)
486 def test_template(self):
487 net_obj = copy.deepcopy(self.net_obj)
488 expected = {'apiVersion': 'group.com/v2',
489 'kind': kubernetes.NetworkObject.KIND,
491 'name': 'fake_name'},
492 'plugin': 'fake_plugin',
495 crd.spec.group = 'group.com'
496 crd.spec.version = 'v2'
498 self.assertEqual(expected, net_obj.template)
500 def test_group(self):
501 net_obj = copy.deepcopy(self.net_obj)
502 net_obj._crd = mock.Mock()
503 net_obj._crd.spec.group = 'fake_group'
504 self.assertEqual('fake_group', net_obj.group)
506 def test_version(self):
507 net_obj = copy.deepcopy(self.net_obj)
508 net_obj._crd = mock.Mock()
509 net_obj._crd.spec.version = 'version_4'
510 self.assertEqual('version_4', net_obj.version)
512 def test_plural(self):
513 net_obj = copy.deepcopy(self.net_obj)
514 net_obj._crd = mock.Mock()
515 net_obj._crd.spec.names.plural = 'name_ending_in_s'
516 self.assertEqual('name_ending_in_s', net_obj.plural)
518 def test_scope(self):
519 net_obj = copy.deepcopy(self.net_obj)
520 net_obj._crd = mock.Mock()
521 net_obj._crd.spec.scope = 'Cluster'
522 self.assertEqual('Cluster', net_obj.scope)
524 @mock.patch.object(kubernetes_utils, 'create_network')
525 def test_create(self, mock_create_network):
526 net_obj = copy.deepcopy(self.net_obj)
527 net_obj._scope = 'scope'
528 net_obj._group = 'group'
529 net_obj._version = 'version'
530 net_obj._plural = 'plural'
531 net_obj._template = 'template'
533 mock_create_network.assert_called_once_with(
534 'scope', 'group', 'version', 'plural', 'template')
536 @mock.patch.object(kubernetes_utils, 'delete_network')
537 def test_delete(self, mock_delete_network):
538 net_obj = copy.deepcopy(self.net_obj)
539 net_obj._scope = 'scope'
540 net_obj._group = 'group'
541 net_obj._version = 'version'
542 net_obj._plural = 'plural'
543 net_obj._name = 'name'
545 mock_delete_network.assert_called_once_with(
546 'scope', 'group', 'version', 'plural', 'name')
549 class ServiceNodePortObjectTestCase(base.BaseUnitTestCase):
551 def test__init(self):
552 with mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port') \
554 kubernetes.ServiceNodePortObject(
555 'fake_name', node_ports=[{'port': 80, 'name': 'web'}])
557 mock_add_port.assert_has_calls([mock.call(22, 'ssh', protocol='TCP'),
558 mock.call(80, 'web')])
560 @mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port')
561 def test__init_missing_mandatory_parameters(self, *args):
562 with self.assertRaises(
563 exceptions.KubernetesServiceObjectDefinitionError):
564 kubernetes.ServiceNodePortObject(
565 'fake_name', node_ports=[{'port': 80}])
566 with self.assertRaises(
567 exceptions.KubernetesServiceObjectDefinitionError):
568 kubernetes.ServiceNodePortObject(
569 'fake_name', node_ports=[{'name': 'web'}])
571 @mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port')
572 def test__init_missing_bad_name(self, *args):
573 with self.assertRaises(
574 exceptions.KubernetesServiceObjectNameError):
575 kubernetes.ServiceNodePortObject(
576 'fake_name', node_ports=[{'port': 80, 'name': '-web'}])
577 with self.assertRaises(
578 exceptions.KubernetesServiceObjectNameError):
579 kubernetes.ServiceNodePortObject(
580 'fake_name', node_ports=[{'port': 80, 'name': 'Web'}])
581 with self.assertRaises(
582 exceptions.KubernetesServiceObjectNameError):
583 kubernetes.ServiceNodePortObject(
584 'fake_name', node_ports=[{'port': 80, 'name': 'web-'}])
586 def test__add_port(self):
587 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
588 port_ssh = {'name': 'ssh',
591 port_definition = {'port': 80,
596 port = copy.deepcopy(port_definition)
597 _port = port.pop('port')
598 name = port.pop('name')
599 nodeport_object._add_port(_port, name, **port)
600 self.assertEqual([port_ssh, port_definition],
601 nodeport_object.template['spec']['ports'])
603 @mock.patch.object(kubernetes_utils, 'create_service')
604 def test_create(self, mock_create_service):
605 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
606 nodeport_object.template = 'fake_template'
607 nodeport_object.create()
608 mock_create_service.assert_called_once_with('fake_template')
610 @mock.patch.object(kubernetes_utils, 'delete_service')
611 def test_delete(self, mock_delete_service):
612 nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
613 nodeport_object.delete()
614 mock_delete_service.assert_called_once_with('fake_name-service',
618 class KubernetesTemplate(base.BaseUnitTestCase):
620 def test_get_rc_by_name(self):
623 'host1': {'args': 'some data'}
626 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
627 rc = k_template.get_rc_by_name('host1-k8s_name')
628 self.assertTrue(isinstance(rc, kubernetes.ReplicationControllerObject))
630 def test_get_rc_by_name_wrong_name(self):
633 'host1': {'args': 'some data'}
636 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
637 self.assertIsNone(k_template.get_rc_by_name('wrong_host_name'))
639 def test_get_rc_by_name_no_rcs(self):
640 ctx_cfg = {'servers': {}}
641 k_template = kubernetes.KubernetesTemplate('k8s_name', ctx_cfg)
642 self.assertIsNone(k_template.get_rc_by_name('any_host_name'))