import logging
import os
+from copy import deepcopy
from toscaparser.common.exception import ExceptionCollector
from toscaparser.common.exception import InvalidTemplateVersion
from toscaparser.common.exception import MissingRequiredFieldError
'''Load the template data.'''
def __init__(self, path=None, parsed_params=None, a_file=True,
- yaml_dict_tpl=None):
- ExceptionCollector.start()
+ yaml_dict_tpl=None, sub_mapped_node_template=None):
+ if sub_mapped_node_template is None:
+ ExceptionCollector.start()
self.a_file = a_file
self.input_path = None
self.path = None
self.tpl = None
- self.nested_tosca_template = []
+ self.sub_mapped_node_template = sub_mapped_node_template
+ self.nested_tosca_tpls_with_topology = {}
+ self.nested_tosca_templates_with_topology = []
if path:
self.input_path = path
self.path = self._get_path(path)
self.relationship_templates = self._relationship_templates()
self.nodetemplates = self._nodetemplates()
self.outputs = self._outputs()
+ self._handle_nested_tosca_templates_with_topology()
self.graph = ToscaGraph(self.nodetemplates)
- ExceptionCollector.stop()
+ if sub_mapped_node_template is None:
+ ExceptionCollector.stop()
self.verify_template()
def _topology_template(self):
return TopologyTemplate(self._tpl_topology_template(),
self._get_all_custom_defs(),
self.relationship_types,
- self.parsed_params)
+ self.parsed_params,
+ self.sub_mapped_node_template)
def _inputs(self):
return self.topology_template.inputs
imports = self._tpl_imports()
if imports:
- custom_service = toscaparser.imports.\
- ImportsLoader(imports, self.path,
- type_defs, self.tpl)
+ custom_service = \
+ toscaparser.imports.ImportsLoader(imports, self.path,
+ type_defs, self.tpl)
- nested_topo_tpls = custom_service.get_nested_topo_tpls()
- self._handle_nested_topo_tpls(nested_topo_tpls)
+ nested_tosca_tpls = custom_service.get_nested_tosca_tpls()
+ self._update_nested_tosca_tpls_with_topology(nested_tosca_tpls)
custom_defs = custom_service.get_custom_defs()
if not custom_defs:
custom_defs.update(inner_custom_types)
return custom_defs
- def _handle_nested_topo_tpls(self, nested_topo_tpls):
- for tpl in nested_topo_tpls:
+ def _update_nested_tosca_tpls_with_topology(self, nested_tosca_tpls):
+ for tpl in nested_tosca_tpls:
filename, tosca_tpl = list(tpl.items())[0]
- if tosca_tpl.get(TOPOLOGY_TEMPLATE):
- nested_template = ToscaTemplate(
- path=filename, parsed_params=self.parsed_params,
- yaml_dict_tpl=tosca_tpl)
- if nested_template.topology_template.substitution_mappings:
- self.nested_tosca_template.append(nested_template)
+ if (tosca_tpl.get(TOPOLOGY_TEMPLATE) and
+ filename not in list(
+ self.nested_tosca_tpls_with_topology.keys())):
+ self.nested_tosca_tpls_with_topology.update(tpl)
+
+ def _handle_nested_tosca_templates_with_topology(self):
+ for fname, tosca_tpl in self.nested_tosca_tpls_with_topology.items():
+ for nodetemplate in self.nodetemplates:
+ if self._is_sub_mapped_node(nodetemplate, tosca_tpl):
+ parsed_params = self._get_params_for_nested_template(
+ nodetemplate)
+ nested_template = ToscaTemplate(
+ path=fname, parsed_params=parsed_params,
+ yaml_dict_tpl=tosca_tpl,
+ sub_mapped_node_template=nodetemplate)
+ if nested_template.has_substitution_mappings():
+ # Record the nested templates in top level template
+ self.nested_tosca_templates_with_topology.\
+ append(nested_template)
+ # Set the substitution toscatemplate for mapped node
+ nodetemplate.sub_mapping_tosca_template = \
+ nested_template
def _validate_field(self):
version = self._tpl_version()
msg = _('The pre-parsed input successfully passed validation.')
log.info(msg)
+
+ def _is_sub_mapped_node(self, nodetemplate, tosca_tpl):
+ """Return True if the nodetemple is substituted."""
+ if (nodetemplate and not nodetemplate.substitution_mapped and
+ self.get_sub_mapping_node_type(tosca_tpl) == nodetemplate.type
+ and len(nodetemplate.interfaces) < 1):
+ return True
+ else:
+ return False
+
+ def _get_params_for_nested_template(self, nodetemplate):
+ """Return total params for nested_template."""
+ parsed_params = deepcopy(self.parsed_params) \
+ if self.parsed_params else {}
+ if nodetemplate:
+ for pname in nodetemplate.get_properties():
+ parsed_params.update({pname:
+ nodetemplate.get_property_value(pname)})
+ return parsed_params
+
+ def get_sub_mapping_node_type(self, tosca_tpl):
+ """Return substitution mappings node type."""
+ if tosca_tpl:
+ return TopologyTemplate.get_sub_mapping_node_type(
+ tosca_tpl.get(TOPOLOGY_TEMPLATE))
+
+ def has_substitution_mappings(self):
+ """Return True if the template has valid substitution mappings."""
+ return self.topology_template is not None and \
+ self.topology_template.substitution_mappings is not None
+
+ def has_nested_templates(self):
+ """Return True if the tosca template has nested templates."""
+ return self.nested_tosca_templates_with_topology is not None and \
+ len(self.nested_tosca_templates_with_topology) >= 1