Calipso uses Sensu framework for Monitoring. It automatically deploys and configures the necessary config files on all hosts, writes customized checks and handlers to setup monitoring per inventory object, as defined in the Calipso virtual networking discovery model.
After collecting the data, from processes and workers provisioned by the cloud management systems, calipso dynamically checks for health and availability, as a baseline for SLA monitoring.
Calipso allows networking administrators to operate, plan for maintenance or troubleshooting and provides an easy to use hierarchical representation of all the virtual networking components.
-Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems)
-and others.
+
+Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) and others.
+
+The following wonderful and very talented engineers has been coding, at various times, and contributing to Calipso\92s development:
+
+Ilia Abashin - iabashin@cisco.com
+Eyal Lapid - eyal.lapid@protonmail.com
+Xiaocong Dong - buptdxc@gmail.com
+Stas Isakov - iabashin@cisco.com
+Ofir Ashery - oashery@cisco.com
+
"/aggregates": "resource.aggregates.Aggregates",
"/environment_configs":
"resource.environment_configs.EnvironmentConfigs",
+ "/connection_tests": "resource.connection_tests.ConnectionTests",
"/auth/tokens": "auth.tokens.Tokens"
}
requirement=self.AGGREGATE_TYPES,
mandatory=True,
error_messages={"mandatory":
- "type must be specified: " +
- "environment/" +
- " message/" +
- "constant"})
+ "type must be specified: (" +
+ ",".join(self.AGGREGATE_TYPES) +
+ ")"})
}
self.validate_query_data(filters, filters_requirements)
query = self.build_query(filters)
filters = self.parse_query_params(req)
focal_point_types = self.get_constants_by_name("object_types")
filters_requirements = {
- 'id': self.require(ObjectId, True),
- 'focal_point_type': self.require(str, False, DataValidate.LIST,
- focal_point_types),
+ 'id': self.require(ObjectId, convert_to_type=True),
+ 'focal_point_type': self.require(str,
+ validate=DataValidate.LIST,
+ requirement=focal_point_types),
'constraint': self.require([list, str]),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
page, page_size = self.get_pagination(filters)
else:
clique_constraints_ids = self.get_objects_list(self.COLLECTION,
query,
- page, page_size, self.PROJECTION)
+ page, page_size,
+ self.PROJECTION)
self.set_successful_response(
resp, {"clique_constraints": clique_constraints_ids}
)
self.ID: True,
"focal_point_type": True,
"link_types": True,
- "environment": True
+ "environment": True,
+ "name": True
}
def on_get(self, req, resp):
link_types = self.get_constants_by_name("link_types")
filters_requirements = {
'env_name': self.require(str, mandatory=True),
- 'id': self.require(ObjectId, True),
+ 'id': self.require(ObjectId, convert_to_type=True),
'focal_point_type': self.require(str,
validate=DataValidate.LIST,
requirement=focal_point_types),
'link_type': self.require([list, str],
validate=DataValidate.LIST,
requirement=link_types),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'name': self.require(str),
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
link_types = self.get_constants_by_name("link_types")
clique_type_requirements = {
'environment': self.require(str, mandatory=True),
- 'focal_point_type': self.require(str, False, DataValidate.LIST,
- focal_point_types, True),
- 'link_types': self.require(list, False, DataValidate.LIST,
- link_types, True),
+ 'focal_point_type': self.require(str,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=focal_point_types),
+ 'link_types': self.require(list,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=link_types),
'name': self.require(str, mandatory=True)
}
env_name = clique_type['environment']
if not self.check_environment_name(env_name):
- self.bad_request("unkown environment: " + env_name)
+ self.bad_request("unknown environment: " + env_name)
self.write(clique_type, self.COLLECTION)
self.set_successful_response(resp,
def build_query(self, filters):
query = {}
- filters_keys = ['focal_point_type']
+ filters_keys = ['name', 'focal_point_type']
self.update_query_with_filters(filters, filters_keys, query)
link_types = filters.get('link_type')
if link_types:
link_types = self.get_constants_by_name("link_types")
filters_requirements = {
'env_name': self.require(str, mandatory=True),
- 'id': self.require(ObjectId, True),
- 'focal_point': self.require(ObjectId, True),
- 'focal_point_type': self.require(str, validate=DataValidate.LIST,
+ 'id': self.require(ObjectId, convert_to_type=True),
+ 'focal_point': self.require(ObjectId, convert_to_type=True),
+ 'focal_point_type': self.require(str,
+ validate=DataValidate.LIST,
requirement=focal_point_types),
- 'link_type': self.require(str, validate=DataValidate.LIST,
+ 'link_type': self.require(str,
+ validate=DataValidate.LIST,
requirement=link_types),
- 'link_id': self.require(ObjectId, True),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'link_id': self.require(ObjectId, convert_to_type=True),
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
page, page_size = self.get_pagination(filters)
--- /dev/null
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import datetime
+
+from bson import ObjectId
+
+from api.responders.resource.environment_configs import EnvironmentConfigs
+from api.responders.responder_base import ResponderBase
+from api.validation.data_validate import DataValidate
+
+
+class ConnectionTests(ResponderBase):
+
+ COLLECTION = "connection_tests"
+ ID = "_id"
+ TARGETS = "test_targets"
+ RESULTS = "test_results"
+ CONFIGURATIONS = "targets_configuration"
+ STATUSES = ["request", "response"]
+
+ def __init__(self):
+ super().__init__()
+ self.PROJECTION = {
+ self.ID: True,
+ self.TARGETS: True,
+ self.RESULTS: True
+ }
+ self.allowed_targets = \
+ self.get_constants_by_name("configuration_targets")
+
+ def build_query(self, filters):
+ query = {}
+
+ self.update_query_with_filters(filters, ["status"], query)
+
+ if 'id' in filters:
+ query[self.ID] = filters['id']
+ elif 'env_name' in filters:
+ query['environment'] = filters['env_name']
+ else:
+ self.bad_request(message="Either 'id' or 'env_name' "
+ "field is required")
+
+ return query
+
+ def on_get(self, req, resp):
+ self.log.debug("Getting a connection test")
+ filters = self.parse_query_params(req)
+
+ filters_requirements = {
+ 'env_name': self.require(str, mandatory=True),
+ 'id': self.require(ObjectId, convert_to_type=True),
+ 'status': self.require(str,
+ requirement=self.STATUSES),
+ self.TARGETS: self.require([list, str],
+ validate=DataValidate.LIST,
+ requirement=self.allowed_targets),
+ self.RESULTS: self.require(bool, convert_to_type=True),
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
+ }
+
+ self.validate_query_data(filters, filters_requirements)
+
+ query = self.build_query(filters)
+
+ if self.ID in query:
+ result = self.get_object_by_id(collection=self.COLLECTION,
+ query=query,
+ stringify_types=[ObjectId,
+ datetime.datetime],
+ id=self.ID)
+
+ test_targets = result.get(self.TARGETS, [])
+ targets_config = result.get(self.CONFIGURATIONS, [])
+ test_results = result.get(self.RESULTS, {})
+
+ # Filter data by target names
+ targets_filter = filters.get(self.TARGETS)
+ if targets_filter:
+ test_targets = [target
+ for target in test_targets
+ if target in targets_filter]
+ targets_config = [config
+ for config in targets_config
+ if config['name'] in targets_filter]
+ test_results = {target: result
+ for target, result in test_results.items()
+ if target in targets_filter}
+
+ # Filter data by test results (success/failure)
+ results_filter = filters.get(self.RESULTS)
+ if results_filter is not None:
+ test_results = {target: result
+ for target, result in test_results.items()
+ if result == results_filter}
+
+ results_keys = test_results.keys()
+ test_targets = [target
+ for target in test_targets
+ if target in results_keys]
+ targets_config = [config
+ for config in targets_config
+ if config['name'] in results_keys]
+
+ result[self.TARGETS] = test_targets
+ result[self.CONFIGURATIONS] = targets_config
+ result[self.RESULTS] = test_results
+
+ self.set_successful_response(resp, result)
+ else:
+ page, page_size = self.get_pagination(filters)
+ tests_ids = self.get_objects_list(collection=self.COLLECTION,
+ query=query,
+ page=page,
+ page_size=page_size,
+ projection=self.PROJECTION)
+ self.set_successful_response(resp, {"connection_tests": tests_ids})
+
+ def on_post(self, req, resp):
+ self.log.debug("Posting a new connection test")
+ error, connection_test = self.get_content_from_request(req)
+ if error:
+ self.bad_request(error)
+
+ conn_test_requirements = {
+ "environment": self.require(str, mandatory=True),
+ self.TARGETS: self.require(list,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=self.allowed_targets),
+ self.CONFIGURATIONS: self.require(list, mandatory=True)
+ }
+ self.validate_query_data(connection_test, conn_test_requirements)
+
+ test_targets = connection_test[self.TARGETS]
+ targets_configuration = connection_test[self.CONFIGURATIONS]
+ env_name = connection_test["environment"]
+
+ env_configs = EnvironmentConfigs()
+ config_validation = env_configs.validate_environment_config(
+ connection_test[self.CONFIGURATIONS],
+ require_mandatory=False
+ )
+ if not config_validation['passed']:
+ self.bad_request(config_validation['error_message'])
+
+ for test_target in test_targets:
+ if not env_configs.get_configuration_by_name(test_target,
+ targets_configuration):
+ self.bad_request("targets_configuration should contain "
+ "an entry for target '{}'".format(test_target))
+
+ connection_test['submit_timestamp'] = datetime.datetime.now()
+
+ result = self.write(connection_test, self.COLLECTION)
+ self.set_successful_response(resp,
+ {"id": str(result.inserted_id),
+ "message": "Created a new connection test"
+ " for environment {0}"
+ .format(env_name)},
+ "201")
"mysql": {
"name": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"pwd": self.require(str, mandatory=True),
"port": self.require(int,
- True,
- DataValidate.REGEX,
- regex.PORT,
- mandatory=True),
+ mandatory=True,
+ convert_to_type=True,
+ validate=DataValidate.REGEX,
+ requirement=regex.PORT),
"user": self.require(str, mandatory=True)
},
"OpenStack": {
"name": self.require(str, mandatory=True),
"admin_token": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"port": self.require(int,
- True,
+ mandatory=True,
+ convert_to_type=True,
validate=DataValidate.REGEX,
- requirement=regex.PORT,
- mandatory=True),
+ requirement=regex.PORT),
"pwd": self.require(str, mandatory=True),
"user": self.require(str, mandatory=True)
},
"CLI": {
"name": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"user": self.require(str, mandatory=True),
"pwd": self.require(str),
"key": self.require(str,
"AMQP": {
"name": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"pwd": self.require(str, mandatory=True),
"port": self.require(int,
- True,
+ mandatory=True,
+ convert_to_type=True,
validate=DataValidate.REGEX,
- requirement=regex.PORT,
- mandatory=True),
+ requirement=regex.PORT),
"user": self.require(str, mandatory=True)
},
"Monitoring": {
"name": self.require(str, mandatory=True),
"config_folder": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=regex.PATH,
- mandatory=True),
+ requirement=regex.PATH),
"provision": self.require(str,
+ mandatory=True,
validate=DataValidate.LIST,
- requirement=self.provision_types,
- mandatory=True),
+ requirement=self.provision_types),
"env_type": self.require(str,
+ mandatory=True,
validate=DataValidate.LIST,
- requirement=self.env_types,
- mandatory=True),
- "api_port": self.require(int, True, mandatory=True),
+ requirement=self.env_types),
+ "api_port": self.require(int,
+ mandatory=True,
+ convert_to_type=True),
"rabbitmq_pass": self.require(str, mandatory=True),
"rabbitmq_user": self.require(str, mandatory=True),
"rabbitmq_port": self.require(int,
- True,
+ mandatory=True,
+ convert_to_type=True,
validate=DataValidate.REGEX,
- requirement=regex.PORT,
- mandatory=True),
+ requirement=regex.PORT),
"ssh_port": self.require(int,
- True,
+ convert_to_type=True,
validate=DataValidate.REGEX,
requirement=regex.PORT),
"ssh_user": self.require(str),
"ssh_password": self.require(str),
"server_ip": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"server_name": self.require(str, mandatory=True),
"type": self.require(str,
+ mandatory=True,
validate=DataValidate.LIST,
- requirement=self.monitoring_types,
- mandatory=True)
+ requirement=self.monitoring_types)
},
"NFV_provider": {
"name": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"nfv_token": self.require(str, mandatory=True),
"port": self.require(int,
- True,
- DataValidate.REGEX,
- regex.PORT,
- True),
+ mandatory=True,
+ convert_to_type=True,
+ validate=DataValidate.REGEX,
+ requirement=regex.PORT),
"user": self.require(str, mandatory=True),
"pwd": self.require(str, mandatory=True)
},
"ACI": {
"name": self.require(str, mandatory=True),
"host": self.require(str,
+ mandatory=True,
validate=DataValidate.REGEX,
- requirement=[regex.IP, regex.HOSTNAME],
- mandatory=True),
+ requirement=[regex.IP, regex.HOSTNAME]),
"user": self.require(str, mandatory=True),
"pwd": self.require(str, mandatory=True)
}
filters_requirements = {
"name": self.require(str),
- "distribution": self.require(str, False,
- DataValidate.LIST,
- self.distributions),
+ "distribution": self.require(str,
+ validate=DataValidate.LIST,
+ requirement=self.distributions),
"mechanism_drivers": self.require([str, list],
- False,
- DataValidate.LIST,
- self.mechanism_drivers),
- "type_drivers": self.require(str, False,
- DataValidate.LIST,
- self.type_drivers),
+ validate=DataValidate.LIST,
+ requirement=self.mechanism_drivers),
+ "type_drivers": self.require(str,
+ validate=DataValidate.LIST,
+ requirement=self.type_drivers),
"user": self.require(str),
- "listen": self.require(bool, True),
- "scanned": self.require(bool, True),
- "monitoring_setup_done": self.require(bool, True),
- "operational": self.require(str, False,
- DataValidate.LIST,
- self.operational_values),
- "page": self.require(int, True),
- "page_size": self.require(int, True)
+ "listen": self.require(bool, convert_to_type=True),
+ "scanned": self.require(bool, convert_to_type=True),
+ "monitoring_setup_done": self.require(bool, convert_to_type=True),
+ "operational": self.require(str,
+ validate=DataValidate.LIST,
+ requirement=self.operational_values),
+ "page": self.require(int, convert_to_type=True),
+ "page_size": self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
environment_config_requirement = {
"app_path": self.require(str, mandatory=True),
"configuration": self.require(list, mandatory=True),
- "distribution": self.require(str, False, DataValidate.LIST,
- self.distributions, True),
+ "distribution": self.require(str,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=self.distributions),
"distribution_version": self.require(str, mandatory=True),
- "listen": self.require(bool, True, mandatory=True),
+ "listen": self.require(bool,
+ mandatory=True,
+ convert_to_type=True),
"user": self.require(str),
- "mechanism_drivers": self.require(list, False, DataValidate.LIST,
- self.mechanism_drivers, True),
+ "mechanism_drivers": self.require(list,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=self.mechanism_drivers),
"name": self.require(str, mandatory=True),
- "operational": self.require(str, True, DataValidate.LIST,
- self.operational_values, mandatory=True),
- "scanned": self.require(bool, True),
+ "operational": self.require(str,
+ mandatory=True,
+ convert_to_type=True,
+ validate=DataValidate.LIST,
+ requirement=self.operational_values),
+ "scanned": self.require(bool, convert_to_type=True),
"last_scanned": self.require(str),
"type": self.require(str, mandatory=True),
- "type_drivers": self.require(str, False, DataValidate.LIST,
- self.type_drivers, True),
- "enable_monitoring": self.require(bool, True),
- "monitoring_setup_done": self.require(bool, True),
- "auth": self.require(dict)
+ "type_drivers": self.require(str,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=self.type_drivers),
+ "enable_monitoring": self.require(bool, convert_to_type=True),
+ "monitoring_setup_done": self.require(bool, convert_to_type=True),
+ "auth": self.require(dict),
+ "aci_enabled": self.require(bool, convert_to_type=True)
}
self.validate_query_data(env_config,
environment_config_requirement,
.format(env_config["name"])},
"201")
- def validate_environment_config(self, configurations):
+ def validate_environment_config(self, configurations,
+ require_mandatory=True):
configurations_of_names = {}
validation = {"passed": True}
if [config for config in configurations
"configuration for {0}".format(name)
return validation
configurations_of_names[name] = configs[0]
- else:
+ elif require_mandatory:
if name not in self.OPTIONAL_CONFIGURATIONS_NAMES:
validation["passed"] = False
validation['error_message'] = "configuration for {0} " \
'id_path': self.require(str),
'parent_id': self.require(str),
'parent_path': self.require(str),
- 'sub_tree': self.require(bool, True),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'sub_tree': self.require(bool, convert_to_type=True),
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
page, page_size = self.get_pagination(filters)
link_states = self.get_constants_by_name("link_states")
filters_requirements = {
'env_name': self.require(str, mandatory=True),
- 'id': self.require(ObjectId, True),
+ 'id': self.require(ObjectId, convert_to_type=True),
'host': self.require(str),
- 'link_type': self.require(str, validate=DataValidate.LIST,
+ 'link_type': self.require(str,
+ validate=DataValidate.LIST,
requirement=link_types),
'link_name': self.require(str),
'source_id': self.require(str),
'target_id': self.require(str),
- 'state': self.require(str, validate=DataValidate.LIST,
+ 'state': self.require(str,
+ validate=DataValidate.LIST,
requirement=link_states),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements, r'^attributes\:\w+$')
'env_name': self.require(str, mandatory=True),
'source_system': self.require(str),
'id': self.require(str),
- 'level': self.require(str, validate=DataValidate.LIST,
+ 'level': self.require(str,
+ validate=DataValidate.LIST,
requirement=messages_severity),
'related_object': self.require(str),
- 'related_object_type': self.require(str, validate=DataValidate.LIST,
+ 'related_object_type': self.require(str,
+ validate=DataValidate.LIST,
requirement=object_types),
'start_time': self.require(str),
'end_time': self.require(str),
- 'page': self.require(int, True),
- 'page_size': self.require(int, True)
+ 'page': self.require(int, convert_to_type=True),
+ 'page_size': self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
page, page_size = self.get_pagination(filters)
sides = self.get_constants_by_name("monitoring_sides")
filters_requirements = {
- "id": self.require(ObjectId, True),
- "order": self.require(int, True),
- "side": self.require(str, validate=DataValidate.LIST,
+ "id": self.require(ObjectId, convert_to_type=True),
+ "order": self.require(int, convert_to_type=True),
+ "side": self.require(str,
+ validate=DataValidate.LIST,
requirement=sides),
"type": self.require(str),
- "page": self.require(int, True),
- "page_size": self.require(int, True)
+ "page": self.require(int, convert_to_type=True),
+ "page_size": self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
"id": self.require(ObjectId, convert_to_type=True),
"base_object": self.require(str),
"status": self.require(str,
- convert_to_type=False,
validate=DataValidate.LIST,
requirement=scan_statuses),
"page": self.require(int, convert_to_type=True),
filters_requirements = {
"environment": self.require(str, mandatory=True),
- "id": self.require(ObjectId, True),
- "freq": self.require(str, False,
- DataValidate.LIST, self.SCAN_FREQ),
- "page": self.require(int, True),
- "page_size": self.require(int, True)
+ "id": self.require(ObjectId, convert_to_type=True),
+ "freq": self.require(str,
+ validate=DataValidate.LIST,
+ requirement=self.SCAN_FREQ),
+ "page": self.require(int, convert_to_type=True),
+ "page_size": self.require(int, convert_to_type=True)
}
self.validate_query_data(filters, filters_requirements)
log_levels = self.get_constants_by_name("log_levels")
scheduled_scan_requirements = {
"environment": self.require(str, mandatory=True),
- "scan_only_links": self.require(bool, True),
- "scan_only_cliques": self.require(bool, True),
- "scan_only_inventory": self.require(bool, True),
- "freq": self.require(str, validate=DataValidate.LIST,
- requirement=self.SCAN_FREQ,
- mandatory=True),
+ "scan_only_links": self.require(bool, convert_to_type=True),
+ "scan_only_cliques": self.require(bool, convert_to_type=True),
+ "scan_only_inventory": self.require(bool, convert_to_type=True),
+ "freq": self.require(str,
+ mandatory=True,
+ validate=DataValidate.LIST,
+ requirement=self.SCAN_FREQ),
"log_level": self.require(str,
validate=DataValidate.LIST,
requirement=log_levels),
- "clear": self.require(bool, True),
+ "clear": self.require(bool, convert_to_type=True),
"submit_timestamp": self.require(str, mandatory=True)
}
self.validate_query_data(scheduled_scan, scheduled_scan_requirements)
env_name = scheduled_scan["environment"]
if not self.check_environment_name(env_name):
- self.bad_request("unkown environment: " + env_name)
+ self.bad_request("unknown environment: " + env_name)
self.write(scheduled_scan, self.COLLECTION)
self.set_successful_response(resp,
- {"message": "created a new scheduled scan for "
- "environment {0}"
+ {"message": "created a new scheduled scan "
+ "for environment {0}"
.format(env_name)},
"201")
env_name = query.get("environment")
if env_name and \
not self.check_environment_name(env_name):
- self.bad_request("unkown environment: " + env_name)
+ self.bad_request("unknown environment: " + env_name)
self.not_found()
obj = objs[0]
stringify_object_values_by_types(obj, stringify_types)
- if id is "_id":
+ if id == "_id":
obj['id'] = obj.get('_id')
return obj
- def get_objects_list(self, collection, query, page, page_size,
- projection, stringify_types=None):
+ def get_objects_list(self, collection, query, page=0, page_size=1000,
+ projection=None, stringify_types=None):
objects = self.read(collection, query, projection, page, page_size)
if not objects:
env_name = query.get("environment")
if env_name and \
not self.check_environment_name(env_name):
- self.bad_request("unkown environment: " + env_name)
+ self.bad_request("unknown environment: " + env_name)
self.not_found()
for obj in objects:
if "id" not in obj and "_id" in obj:
def write(self, document, collection="inventory"):
try:
- self.get_collection_by_name(collection).\
- insert_one(document)
+ return self.get_collection_by_name(collection)\
+ .insert_one(document)
except errors.DuplicateKeyError as e:
self.conflict("The key value ({0}) already exists".
format(', '.
self.inv = InventoryMgr()
self.ethtool_attr = re.compile('^\s+([^:]+):\s(.*)$')
self.regexps = [
- {'name': 'mac_address', 're': '^.*\sHWaddr\s(\S+)(\s.*)?$',
- 'description': 'MAC address with HWaddr'},
- {'name': 'mac_address', 're': '^.*\sether\s(\S+)(\s.*)?$',
- 'description': 'MAC address with ether'},
- {'name': 'IP Address', 're': '^\s*inet addr:?(\S+)\s.*$',
- 'description': 'IP Address with "inet addr"'},
- {'name': 'IP Address', 're': '^\s*inet ([0-9.]+)\s.*$',
- 'description': 'IP Address with "inet"'},
- {'name': 'IPv6 Address', 're': '^\s*inet6 addr:\s*(\S+)(\s.*)?$',
- 'description': 'IPv6 Address with "inet6 addr"'},
- {'name': 'IPv6 Address', 're': '^\s*inet6 \s*(\S+)(\s.*)?$',
- 'description': 'IPv6 Address with "inet6"'}
+ {'name': 'mac_address', 're': '^.*\slink/ether\s(\S+)\s',
+ 'description': 'MAC address'},
+ {'name': 'IP Address', 're': '^\s*inet ([0-9.]+)/',
+ 'description': 'IP Address v4'},
+ {'name': 'IPv6 Address', 're': '^\s*inet6 (\S+) .* global ',
+ 'description': 'IPv6 Address'}
]
def get(self, id):
for line in interface_lines:
interface_name = line[line.rindex('/')+1:]
interface_name = interface_name.strip()
- # run ifconfig with specific interface name,
+ # run 'ip address show' with specific interface name,
# since running it with no name yields a list without inactive pNICs
interface = self.find_interface_details(host_id, interface_name)
if interface:
return interfaces
def find_interface_details(self, host_id, interface_name):
- lines = self.run_fetch_lines("ifconfig " + interface_name, host_id)
+ cmd = "ip address show {}".format(interface_name)
+ lines = self.run_fetch_lines(cmd, host_id)
interface = None
status_up = None
for line in [l for l in lines if l != '']:
tokens = None
if interface is None:
tokens = line.split()
- line_remainder = line.strip('-')[len(interface_name)+2:]
- line_remainder = line_remainder.strip(' :')
+ line_remainder = line.split(":")[2].strip()
interface = {
"host": host_id,
"name": interface_name,
def __init__(self):
super().__init__()
self.inv = InventoryMgr()
- self.if_header = re.compile('^[-]?(\S+)\s+(.*)$')
+ self.if_header = re.compile('^\d+: ([^:]+): (.+)')
self.regexps = [
- {'name': 'mac_address', 're': '^.*\sHWaddr\s(\S+)(\s.*)?$'},
- {'name': 'mac_address', 're': '^.*\sether\s(\S+)(\s.*)?$'},
- {'name': 'IP Address', 're': '^\s*inet addr:(\S+)\s.*$'},
- {'name': 'IP Address', 're': '^\s*inet ([0-9.]+)\s.*$'},
- {'name': 'netmask', 're': '^.*\sMask:\s?([0-9.]+)(\s.*)?$'},
- {'name': 'netmask', 're': '^.*\snetmask\s([0-9.]+)(\s.*)?$'},
+ {'name': 'mac_address', 're': '^.*\slink/ether\s(\S+)\s'},
+ {'name': 'IP Address', 're': '^\s*inet ([0-9.]+)/'},
+ {'name': 'netmask', 're': '^.*\slink/ether\s[^/]+/(\S+)'},
{'name': 'IPv6 Address',
- 're': '^\s*inet6 addr: ?\s*([0-9a-f:/]+)(\s.*)?$'},
- {'name': 'IPv6 Address',
- 're': '^\s*inet6 \s*([0-9a-f:/]+)(\s.*)?$'}
+ 're': '^\s*inet6 ([^/]+)/.* global '}
]
def get(self, host_id):
return ret
def handle_service(self, host, service, enable_cache=True):
- cmd = "ip netns exec " + service + " ifconfig"
+ cmd = "ip netns exec " + service + " ip address show"
lines = self.run_fetch_lines(cmd, host, enable_cache)
interfaces = []
current = None
vnic["IP Address"] = "No IP Address"
return "No IP Address"
ipaddr = vnic["IP Address"].split('.')
+ vnic['netmask'] = self.convert_netmask(vnic['netmask'])
netmask = vnic["netmask"].split('.')
# calculate network start
for octet in netmask:
binary_str += bin(int(octet))[2:].zfill(8)
return str(len(binary_str.rstrip('0')))
+
+ @staticmethod
+ def convert_netmask(cidr):
+ netmask_conversion = {
+ '30': '255.255.255.252',
+ '29': '255.255.255.248',
+ '28': '255.255.255.240',
+ '27': '255.255.255.224',
+ '26': '255.255.255.192',
+ '25': '255.255.255.128',
+ '24': '255.255.255.0',
+ '23': '255.255.254.0',
+ '22': '255.255.252.0',
+ '21': '255.255.248.0',
+ '20': '255.255.240.0',
+ '19': '255.255.224.0',
+ '18': '255.255.192.0',
+ '17': '255.255.128.0',
+ '16': '255.255.0.0'
+ }
+ if cidr not in netmask_conversion:
+ raise ValueError('can''t convert to netmask: {}'.format(cidr))
+ return netmask_conversion.get(cidr)
return results
# find matching vConnector by tunneling_ip of vEdge
- # look for that IP address in ifconfig for the host
+ # look for that IP address in 'ip address show' output for the host
def get_vconnector(self, doc, host_id, vedge):
tunneling_ip = vedge["configurations"]["tunneling_ip"]
- ifconfig_lines = self.run_fetch_lines("ifconfig", host_id)
+ output_lines = self.run_fetch_lines("ip address show", host_id)
interface = None
- ip_string = " " * 10 + "inet addr:" + tunneling_ip + " "
+ ip_string = " inet {}/".format(tunneling_ip)
vconnector = None
- for l in ifconfig_lines:
+ for l in output_lines:
if l.startswith(" "):
if interface and l.startswith(ip_string):
vconnector = interface
break
else:
if " " in l:
- interface = l[:l.index(" ")]
+ # line format is like this:
+ # <interface number>: <interface name>: ....
+ interface = l.split(":")[1].strip()
if vconnector:
doc["vconnector"] = vconnector
network_name = None
network_id = None
for net in instance["network_info"]:
- if net["devname"] == v["id"]:
+ if "{}-{}".format(v["host"], net["devname"]) == v["id"]:
network_name = net["network"]["label"]
network_id = net['network']['id']
v['network'] = network_id
args = setup_args(args, self.DEFAULTS, self.get_args)
# After this setup we assume args dictionary has all keys
# defined in self.DEFAULTS
+ self.log.set_loglevel(args['loglevel'])
try:
MongoAccess.set_config_file(args['mongo_config'])
self.inv = InventoryMgr()
+ self.inv.log.set_loglevel(args['loglevel'])
self.inv.set_collections(args['inventory'])
self.conf = Configuration()
except FileNotFoundError as e:
# generate ScanObject Class and instance.
scanner = Scanner()
+ scanner.log.set_loglevel(args['loglevel'])
scanner.set_env(env_name)
scanner.found_errors[env_name] = False
# which accompanies this distribution, and is available at #
# http://www.apache.org/licenses/LICENSE-2.0 #
###############################################################################
+import copy
import unittest
from discover.fetchers.cli.cli_fetch_host_pnics import CliFetchHostPnics
self.assertEqual(result, INTERFACE_DETAILS, "Can't get interface details")
def test_handle_mac_address_line(self):
- self.fetcher.handle_line(RAW_INTERFACE, MAC_ADDRESS_LINE)
- self.assertEqual(RAW_INTERFACE["mac_address"], MAC_ADDRESS,
- "Can't get the correct mac address")
+ interface = copy.deepcopy(RAW_INTERFACE)
+ self.fetcher.handle_line(interface, MAC_ADDRESS_LINE)
+ self.assertEqual(interface["mac_address"], MAC_ADDRESS,
+ "Can't get the correct MAC address")
# Test failed, defect, result: addr: expected result: fe80::f816:3eff:fea1:eb73/64
- @unittest.SkipTest
def test_handle_ipv6_address_line(self):
- self.fetcher.handle_line(RAW_INTERFACE, IPV6_ADDRESS_LINE)
- self.assertEqual(RAW_INTERFACE['IPv6 Address'], IPV6_ADDRESS,
+ interface = copy.deepcopy(RAW_INTERFACE)
+ self.fetcher.handle_line(interface, IPV6_ADDRESS_LINE)
+ self.assertEqual(interface['IPv6 Address'], IPV6_ADDRESS,
"Can't get the correct ipv6 address")
def test_handle_ipv4_address_line(self):
- self.fetcher.handle_line(RAW_INTERFACE, IPV4_ADDRESS_LINE)
- self.assertEqual(RAW_INTERFACE['IP Address'], IPV4_ADDRESS,
+ interface = copy.deepcopy(RAW_INTERFACE)
+ self.fetcher.handle_line(interface, IPV4_ADDRESS_LINE)
+ self.assertEqual(interface['IP Address'], IPV4_ADDRESS,
"Can't get the correct ipv4 address")
def test_set_interface_data(self):
# which accompanies this distribution, and is available at #
# http://www.apache.org/licenses/LICENSE-2.0 #
###############################################################################
+import copy
+
from discover.fetchers.cli.cli_fetch_vservice_vnics import CliFetchVserviceVnics
from test.fetch.test_fetch import TestFetch
from test.fetch.cli_fetch.test_data.cli_fetch_vservice_vnics import *
# reset methods
self.fetcher.inv.get_by_id = original_get_by_id
- self.assertEqual(result, [], "Can't get empty array when the host doesn't contain host_type")
+ self.assertEqual(result, [],
+ "Can't get empty array when the host "
+ "doesn't contain host_type")
def test_get_with_compute_host(self):
# store original methods
# reset methods
self.fetcher.inv.get_by_id = original_get_by_id
- self.assertEqual(result, [], "Can't get empty array when the host type doesn't contain network")
+ self.assertEqual(result, [],
+ "Can't get empty array when the host type "
+ "doesn't contain network")
def test_handle_service(self):
# store original method
original_run_fetch_lines = self.fetcher.run_fetch_lines
original_set_interface_data = self.fetcher.set_interface_data
# mock the method
- self.fetcher.run_fetch_lines = MagicMock(return_value=IFCONFIG_RESULT)
+ self.fetcher.run_fetch_lines = \
+ MagicMock(return_value=IP_ADDRESS_SHOW_RESULT)
self.fetcher.set_interface_data = MagicMock()
result = self.fetcher.handle_service(NETWORK_NODE['id'], SERVICE_ID)
# reset method
self.fetcher.set_interface_data = original_set_interface_data
self.assertNotEqual(result, [], "Can't get interfaces data")
+ self.assertEqual(result[0].get("IPv6 Address"), IPV6_ADDRESS,
+ "incorrect IPv6 address")
def test_set_interface_data(self):
# store original methods
self.fetcher.inv.get_by_id = MagicMock(return_value=VSERVICE)
self.fetcher.inv.set = MagicMock()
- self.fetcher.set_interface_data(VNIC)
+ vnic = copy.deepcopy(VNIC)
+ self.fetcher.set_interface_data(vnic)
# reset methods
self.fetcher.inv.get_by_field = original_get_by_field
self.fetcher.inv.get_by_id = original_get_by_id
self.fetcher.inv.set = original_set
- self.assertIn("data", VNIC, "Can't set data")
- self.assertIn("cidr", VNIC, "Can't set cidr")
- self.assertIn("network", VNIC, "Can't set network")
+ self.assertIn("data", vnic, "Can't set data")
+ self.assertIn("cidr", vnic, "Can't set cidr")
+ self.assertIn("network", vnic, "Can't set network")
def test_handle_mac_address_line(self):
self.fetcher.handle_line(RAW_VNIC, MAC_ADDRESS_LINE)
- self.assertEqual(RAW_VNIC['mac_address'], MAC_ADDRESS, "Can't get the correct mac address from the line")
+ self.assertEqual(RAW_VNIC['mac_address'], MAC_ADDRESS,
+ "Can't get the correct mac address from the line")
def test_handle_ipv4_address_line(self):
self.fetcher.handle_line(RAW_VNIC, IPV4_ADDRESS_LINE)
- self.assertEqual(RAW_VNIC['IP Address'], IPV4_ADDRESS, "Can't get the correct ipv4 address from the line")
+ self.assertEqual(RAW_VNIC['IP Address'], IPV4_ADDRESS,
+ "Can't get the correct ipv4 address from the line")
def test_handle_ipv6_address_line(self):
self.fetcher.handle_line(RAW_VNIC, IPV6_ADDRESS_LINE)
- self.assertEqual(RAW_VNIC['IPv6 Address'], IPV6_ADDRESS, "Can't get the correct ipv6 address from the line")
+ self.assertEqual(RAW_VNIC['IPv6 Address'], IPV6_ADDRESS,
+ "Can't get the correct ipv6 address from the line")
def test_get_net_size(self):
size = self.fetcher.get_net_size(NET_MASK_ARRAY)
self.assertEqual(size, SIZE, "Can't get the size of network by netmask")
def test_get_cidr_for_vnic(self):
- cidr = self.fetcher.get_cidr_for_vnic(VNIC)
+ vnic = copy.deepcopy(VNIC)
+ cidr = self.fetcher.get_cidr_for_vnic(vnic)
self.assertEqual(cidr, CIDR, "the cidr info is wrong")
"state": "UP"
}
-MAC_ADDRESS_LINE = "eno16777728 Link encap:Ethernet HWaddr 00:50:56:ac:e8:97 "
+MAC_ADDRESS_LINE = " link/ether 00:50:56:ac:e8:97 brd ff:ff:ff:ff:ff:ff"
MAC_ADDRESS = "00:50:56:ac:e8:97"
RAW_INTERFACE = {
"host": "node-6.cisco.com",
INTERFACES_GET_RESULTS = [INTERFACE]
-IPV6_ADDRESS_LINE = " inet6 addr: fe80::f816:3eff:fea1:eb73/64 Scope:Link"
+IPV6_ADDRESS_LINE = " inet6 fe80::f816:3eff:fea1:eb73/64 " \
+ "scope global mngtmpaddr dynamic"
IPV6_ADDRESS = "fe80::f816:3eff:fea1:eb73/64"
-IPV4_ADDRESS_LINE = " inet addr:172.16.13.2 Bcast:172.16.13.255 Mask:255.255.255.0"
+IPV4_ADDRESS_LINE = " inet 172.16.13.2/24 brd 10.56.20.255 scope global eth0"
IPV4_ADDRESS = "172.16.13.2"
ETHTOOL_RESULT = [
SIZE = '24'
VNIC = {
- "IP Address": "172.16.13.2",
- "IPv6 Address": "fe80::f816:3eff:fea1:eb73/64",
+ "IP Address": "10.56.20.80",
+ "IPv6 Address": "2001:420:4482:24c1:250:56ff:feac:502a",
"host": "node-6.cisco.com",
- "id": "tapa68b2627-a1",
+ "id": "eth0",
"lines": [
- "Link encap:Ethernet HWaddr fa:16:3e:a1:eb:73",
- "inet addr:172.16.13.2 Bcast:172.16.13.255 Mask:255.255.255.0",
- "inet6 addr: fe80::f816:3eff:fea1:eb73/64 Scope:Link",
- "UP BROADCAST RUNNING MULTICAST MTU:1450 Metric:1",
- "RX packets:28 errors:0 dropped:35 overruns:0 frame:0",
- "TX packets:8 errors:0 dropped:0 overruns:0 carrier:0",
- "collisions:0 txqueuelen:0",
- "RX bytes:4485 (4.4 KB) TX bytes:648 (648.0 B)",
- ""
+ "<BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000",
+ "link/ether 00:50:56:ac:50:2a brd ff:ff:ff:ff:ff:ff",
+ "inet 10.56.20.80/24 brd 10.56.20.255 scope global eth0",
+ "valid_lft forever preferred_lft forever",
+ "inet6 2001:420:4482:24c1:250:56ff:feac:502a/64 scope global mngtmpaddr dynamic",
+ "valid_lft 2591971sec preferred_lft 604771sec",
+ "inet6 fe80::250:56ff:feac:502a/64 scope link",
+ "valid_lft forever preferred_lft forever"
],
- "mac_address": "fa:16:3e:a1:eb:73",
+ "mac_address": "00:50:56:ac:50:2a",
"master_parent_id": "qdhcp-8673c48a-f137-4497-b25d-08b7b218fd17",
"master_parent_type": "vservice",
"name": "tapa68b2627-a1",
- "netmask": "255.255.255.0",
+ "netmask": "24",
"parent_id": "qdhcp-8673c48a-f137-4497-b25d-08b7b218fd17-vnics",
"parent_text": "vNICs",
"parent_type": "vnics_folder",
"lines": [],
"master_parent_id": "qdhcp-8673c48a-f137-4497-b25d-08b7b218fd17",
"master_parent_type": "vservice",
- "name": "tapa68b2627-a1",
+ "name": "eth0",
"parent_id": "qdhcp-8673c48a-f137-4497-b25d-08b7b218fd17-vnics",
"parent_text": "vNICs",
"parent_type": "vnics_folder",
}
-CIDR = "172.16.13.0/24"
+CIDR = "10.56.20.0/24"
-IFCONFIG_RESULT = [
- "lo Link encap:Local Loopback ",
- " inet addr:127.0.0.1 Mask:255.0.0.0",
- " inet6 addr: ::1/128 Scope:Host",
- " UP LOOPBACK RUNNING MTU:65536 Metric:1",
- " RX packets:0 errors:0 dropped:0 overruns:0 frame:0",
- " TX packets:0 errors:0 dropped:0 overruns:0 carrier:0",
- " collisions:0 txqueuelen:0 ",
- " RX bytes:0 (0.0 B) TX bytes:0 (0.0 B)",
- "",
- "tapa68b2627-a1 Link encap:Ethernet HWaddr fa:16:3e:a1:eb:73 ",
- " inet addr:172.16.13.2 Bcast:172.16.13.255 Mask:255.255.255.0",
- " inet6 addr: fe80::f816:3eff:fea1:eb73/64 Scope:Link",
- " UP BROADCAST RUNNING MULTICAST MTU:1450 Metric:1",
- " RX packets:28 errors:0 dropped:35 overruns:0 frame:0",
- " TX packets:8 errors:0 dropped:0 overruns:0 carrier:0",
- " collisions:0 txqueuelen:0 ",
- " RX bytes:4485 (4.4 KB) TX bytes:648 (648.0 B)",
- ""
+IP_ADDRESS_SHOW_RESULT = [
+ "1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1",
+ " link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00",
+ " inet 127.0.0.1/8 scope host lo",
+ " valid_lft forever preferred_lft forever",
+ " inet6 ::1/128 scope host",
+ " valid_lft forever preferred_lft forever",
+ "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000",
+ " link/ether 00:50:56:ac:50:2a brd ff:ff:ff:ff:ff:ff",
+ " inet 10.56.20.80/24 brd 10.56.20.255 scope global eth0",
+ " valid_lft forever preferred_lft forever",
+ " inet6 2001:420:4482:24c1:250:56ff:feac:502a/64 scope global mngtmpaddr dynamic",
+ " valid_lft 2591971sec preferred_lft 604771sec",
+ " inet6 fe80::250:56ff:feac:502a/64 scope link",
+ " valid_lft forever preferred_lft forever"
]
-MAC_ADDRESS_LINE = "tapa68b2627-a1 Link encap:Ethernet HWaddr 00:50:56:ac:e8:97 "
-MAC_ADDRESS = "00:50:56:ac:e8:97"
-IPV6_ADDRESS_LINE = " inet6 addr: fe80::f816:3eff:fea1:eb73/64 Scope:Link"
-IPV6_ADDRESS = "fe80::f816:3eff:fea1:eb73/64"
-IPV4_ADDRESS_LINE = " inet addr:172.16.13.2 Bcast:172.16.13.255 Mask:255.255.255.0"
-IPV4_ADDRESS = "172.16.13.2"
+MAC_ADDRESS_LINE = " link/ether 00:50:56:ac:50:2a brd ff:ff:ff:ff:ff:ff"
+MAC_ADDRESS = "00:50:56:ac:50:2a"
+IPV6_ADDRESS_LINE = " inet6 2001:420:4482:24c1:250:56ff:feac:502a/64 scope global mngtmpaddr dynamic"
+IPV6_ADDRESS = "2001:420:4482:24c1:250:56ff:feac:502a"
+IPV4_ADDRESS_LINE = " inet 10.56.20.80/24 brd 10.56.20.255 scope global eth0"
+IPV4_ADDRESS = "10.56.20.80"
# functional test
INPUT = "node-6.cisco.com"
},
"type": "vedge"
}
-VEDGE_WITHOUT_CONFIGS ={
+VEDGE_WITHOUT_CONFIGS = {
}
VEDGE_WITHOUT_TUNNEL_TYPES = {
}
}
HOST_ID = "node-5.cisco.com"
-IFCONFIG_LINES = [
- "br-mesh Link encap:Ethernet HWaddr 00:50:56:ac:28:9d ",
- " inet addr:192.168.2.1 Bcast:192.168.2.255 Mask:255.255.255.0",
- " inet6 addr: fe80::d4e1:8fff:fe33:ed6a/64 Scope:Link",
- " UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1",
- " RX packets:2273307 errors:0 dropped:0 overruns:0 frame:0",
- " TX packets:2255930 errors:0 dropped:0 overruns:0 carrier:0",
- " collisions:0 txqueuelen:0 ",
- " RX bytes:578536155 (578.5 MB) TX bytes:598541522 (598.5 MB)",
- ""
+IP_ADDRESS_SHOW_LINES = [
+ "2: br-mesh: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc "
+ "pfifo_fast state UP group default qlen 1000",
+ " link/ether 00:50:56:ac:28:9d brd ff:ff:ff:ff:ff:ff",
+ " inet 192.168.2.1/24 brd 192.168.2.255 scope global br-mesh",
+ " valid_lft forever preferred_lft forever",
+ " inet6 fe80::d4e1:8fff:fe33:ed6a/64 scope global mngtmpaddr dynamic",
+ " valid_lft 2591951sec preferred_lft 604751sec"
]
+
OTEP_WITH_CONNECTOR = {
"host": "node-5.cisco.com",
"ip_address": "192.168.2.1",
test_case["err_msg"])
def test_get_vconnectors(self):
- self.fetcher.run_fetch_lines = MagicMock(return_value=IFCONFIG_LINES)
- self.fetcher.get_vconnector(OTEP_FOR_GETTING_VECONNECTOR,
+ self.fetcher.run_fetch_lines = \
+ MagicMock(return_value=IP_ADDRESS_SHOW_LINES)
+ otep_to_get_vconnector = copy.deepcopy(OTEP_FOR_GETTING_VECONNECTOR)
+ self.fetcher.get_vconnector(otep_to_get_vconnector,
HOST_ID, VEDGE)
- self.assertEqual(OTEP_FOR_GETTING_VECONNECTOR, OTEP_WITH_CONNECTOR,
+ self.assertEqual(otep_to_get_vconnector, OTEP_WITH_CONNECTOR,
"Can't get vconnector from the config lines for otep")
self.set_collection("messages")
self.set_collection("environments_config")
self.set_collection("supported_environments")
+ self.set_collection("connection_tests")
self.set_collection("constants",
use_default_name=True)
self.set_collection("monitoring_config_templates",