namespace='default',
wait=False,
**kwargs): # pragma: no cover
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
metadata = client.V1ObjectMeta(**template.get('metadata', {}))
**kwargs): # pragma: no cover
core_v1_api = get_core_api()
try:
- core_v1_api.delete_namespaced_service(name, namespace, **kwargs)
+ body = client.V1DeleteOptions()
+ core_v1_api.delete_namespaced_service(name, namespace, body, **kwargs)
except ApiException:
LOG.exception('Delete Service failed')
namespace='default',
wait=False,
**kwargs): # pragma: no cover
-
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
try:
core_v1_api.create_namespaced_replication_controller(namespace,
namespace='default',
wait=False,
**kwargs): # pragma: no cover
-
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
body = kwargs.get('body', client.V1DeleteOptions())
kwargs.pop('body', None)
namespace='default',
wait=False,
**kwargs): # pragma: no cover
-
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
body = kwargs.get('body', client.V1DeleteOptions())
kwargs.pop('body', None)
def read_pod_status(name, namespace='default', **kwargs): # pragma: no cover
+ # pylint: disable=unused-argument
return read_pod(name).status.phase
namespace='default',
wait=False,
**kwargs): # pragma: no cover
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
metadata = client.V1ObjectMeta(name=name)
body = client.V1ConfigMap(data=data, metadata=metadata)
namespace='default',
wait=False,
**kwargs): # pragma: no cover
+ # pylint: disable=unused-argument
core_v1_api = get_core_api()
body = kwargs.get('body', client.V1DeleteOptions())
kwargs.pop('body', None)
def get_pod_by_name(name): # pragma: no cover
pod_list = get_pod_list()
return next((n for n in pod_list.items if n.metadata.name.startswith(name)), None)
+
+
+def get_volume_types():
+ """Return the "volume" types supported by the current API"""
+ return [vtype for vtype in client.V1Volume.attribute_map.values()
+ if vtype != 'name']