pipeline = get_configuration("components/pipeline")
logger.debug("pipeline={}".format(pipeline))
components = pipeline.get("components/pipeline")
- components.pop('interface')
+ if 'interface' in components:
+ components.pop('interface')
+ else:
+ raise exceptions.ConsulComponentContentError("error= Components pipeline has no interface")
return components
'GET', 'http://consul:8500/v1/kv/{}'.format(component),
json=[{'Key': component, 'Value': comp_util.get_b64_conf(component)}]
)
- m.register_uri(
- 'GET', 'http://consul:8500/v1/kv/components/port_start',
- json=[{'Key': 'components/port_start', 'Value': comp_util.get_b64_conf("components/port_start")}]
- )
+
m.register_uri(
'PUT', 'http://consul:8500/v1/kv/components/port_start',
json=[]
plugin = configuration.get_plugins()
assert plugin is not None
-
+def test_get_plugins_failure(no_requests):
+ from python_moonutilities import configuration
+ no_requests.register_uri(
+ 'GET', 'http://consul:8500/v1/kv/components/pipeline',
+ json=[{'Key': 'components/pipeline', 'Value': 'eyJjb250YWluZXIiOiAid3Vrb25nc3VuL21vb25fYXV0aHo6djQuMyIsICJwb3J0IjogODA4MX0='}]
+ )
+ with pytest.raises(Exception) as exception_info:
+ configuration.get_plugins()
+ assert str(exception_info.value) == '500: Consul Content error'
################################ component ####################################
def test_get_components():
from python_moonutilities import configuration