Add input validation in substitution_mapping class
[parser.git] / tosca2heat / tosca-parser / toscaparser / imports.py
1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
2 #    not use this file except in compliance with the License. You may obtain
3 #    a copy of the License at
4 #
5 #         http://www.apache.org/licenses/LICENSE-2.0
6 #
7 #    Unless required by applicable law or agreed to in writing, software
8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 #    License for the specific language governing permissions and limitations
11 #    under the License.
12
13 import logging
14 import os
15
16 from toscaparser.common.exception import ExceptionCollector
17 from toscaparser.common.exception import InvalidPropertyValueError
18 from toscaparser.common.exception import MissingRequiredFieldError
19 from toscaparser.common.exception import UnknownFieldError
20 from toscaparser.common.exception import ValidationError
21 from toscaparser.elements.tosca_type_validation import TypeValidation
22 from toscaparser.utils.gettextutils import _
23 import toscaparser.utils.urlutils
24 import toscaparser.utils.yamlparser
25
26 YAML_LOADER = toscaparser.utils.yamlparser.load_yaml
27 log = logging.getLogger("tosca")
28
29
30 class ImportsLoader(object):
31
32     IMPORTS_SECTION = (FILE, REPOSITORY, NAMESPACE_URI, NAMESPACE_PREFIX) = \
33                       ('file', 'repository', 'namespace_uri',
34                        'namespace_prefix')
35
36     def __init__(self, importslist, path, type_definition_list=None,
37                  tpl=None):
38         self.importslist = importslist
39         self.custom_defs = {}
40         self.nested_tosca_tpls = []
41         if not path and not tpl:
42             msg = _('Input tosca template is not provided.')
43             log.warning(msg)
44             ExceptionCollector.appendException(ValidationError(message=msg))
45         self.path = path
46         self.repositories = {}
47         if tpl and tpl.get('repositories'):
48             self.repositories = tpl.get('repositories')
49         self.type_definition_list = []
50         if type_definition_list:
51             if isinstance(type_definition_list, list):
52                 self.type_definition_list = type_definition_list
53             else:
54                 self.type_definition_list.append(type_definition_list)
55         self._validate_and_load_imports()
56
57     def get_custom_defs(self):
58         return self.custom_defs
59
60     def get_nested_tosca_tpls(self):
61         return self.nested_tosca_tpls
62
63     def _validate_and_load_imports(self):
64         imports_names = set()
65
66         if not self.importslist:
67             msg = _('"imports" keyname is defined without including '
68                     'templates.')
69             log.error(msg)
70             ExceptionCollector.appendException(ValidationError(message=msg))
71             return
72
73         for import_def in self.importslist:
74             if isinstance(import_def, dict):
75                 for import_name, import_uri in import_def.items():
76                     if import_name in imports_names:
77                         msg = (_('Duplicate import name "%s" was found.') %
78                                import_name)
79                         log.error(msg)
80                         ExceptionCollector.appendException(
81                             ValidationError(message=msg))
82                     imports_names.add(import_name)
83
84                     full_file_name, custom_type = self._load_import_template(
85                         import_name, import_uri)
86                     namespace_prefix = None
87                     if isinstance(import_uri, dict):
88                         namespace_prefix = import_uri.get(
89                             self.NAMESPACE_PREFIX)
90                     if custom_type:
91                         TypeValidation(custom_type, import_def)
92                         self._update_custom_def(custom_type, namespace_prefix)
93             else:  # old style of imports
94                 full_file_name, custom_type = self._load_import_template(
95                     None, import_def)
96                 if custom_type:
97                     TypeValidation(
98                         custom_type, import_def)
99                     self._update_custom_def(custom_type, None)
100
101             self._update_nested_tosca_tpls(full_file_name, custom_type)
102
103     def _update_custom_def(self, custom_type, namespace_prefix):
104         outer_custom_types = {}
105         for type_def in self.type_definition_list:
106             outer_custom_types = custom_type.get(type_def)
107             if outer_custom_types:
108                 if type_def == "imports":
109                     self.custom_defs.update({'imports': outer_custom_types})
110                 else:
111                     if namespace_prefix:
112                         prefix_custom_types = {}
113                         for type_def_key in outer_custom_types.keys():
114                             namespace_prefix_to_key = (namespace_prefix +
115                                                        "." + type_def_key)
116                             prefix_custom_types[namespace_prefix_to_key] = \
117                                 outer_custom_types[type_def_key]
118                         self.custom_defs.update(prefix_custom_types)
119                     else:
120                         self.custom_defs.update(outer_custom_types)
121
122     def _update_nested_tosca_tpls(self, full_file_name, custom_tpl):
123         if full_file_name and custom_tpl:
124             topo_tpl = {full_file_name: custom_tpl}
125             self.nested_tosca_tpls.append(topo_tpl)
126
127     def _validate_import_keys(self, import_name, import_uri_def):
128         if self.FILE not in import_uri_def.keys():
129             log.warning(_('Missing keyname "file" in import "%(name)s".')
130                         % {'name': import_name})
131             ExceptionCollector.appendException(
132                 MissingRequiredFieldError(
133                     what='Import of template "%s"' % import_name,
134                     required=self.FILE))
135         for key in import_uri_def.keys():
136             if key not in self.IMPORTS_SECTION:
137                 log.warning(_('Unknown keyname "%(key)s" error in '
138                               'imported definition "%(def)s".')
139                             % {'key': key, 'def': import_name})
140                 ExceptionCollector.appendException(
141                     UnknownFieldError(
142                         what='Import of template "%s"' % import_name,
143                         field=key))
144
145     def _load_import_template(self, import_name, import_uri_def):
146         """Handle custom types defined in imported template files
147
148         This method loads the custom type definitions referenced in "imports"
149         section of the TOSCA YAML template by determining whether each import
150         is specified via a file reference (by relative or absolute path) or a
151         URL reference.
152
153         Possibilities:
154         +----------+--------+------------------------------+
155         | template | import | comment                      |
156         +----------+--------+------------------------------+
157         | file     | file   | OK                           |
158         | file     | URL    | OK                           |
159         | preparsed| file   | file must be a full path     |
160         | preparsed| URL    | OK                           |
161         | URL      | file   | file must be a relative path |
162         | URL      | URL    | OK                           |
163         +----------+--------+------------------------------+
164         """
165         short_import_notation = False
166         if isinstance(import_uri_def, dict):
167             self._validate_import_keys(import_name, import_uri_def)
168             file_name = import_uri_def.get(self.FILE)
169             repository = import_uri_def.get(self.REPOSITORY)
170             repos = self.repositories.keys()
171             if repository is not None:
172                 if repository not in repos:
173                     ExceptionCollector.appendException(
174                         InvalidPropertyValueError(
175                             what=_('Repository is not found in "%s"') % repos))
176         else:
177             file_name = import_uri_def
178             repository = None
179             short_import_notation = True
180
181         if not file_name:
182             msg = (_('A template file name is not provided with import '
183                      'definition "%(import_name)s".')
184                    % {'import_name': import_name})
185             log.error(msg)
186             ExceptionCollector.appendException(ValidationError(message=msg))
187             return None, None
188
189         if toscaparser.utils.urlutils.UrlUtils.validate_url(file_name):
190             return file_name, YAML_LOADER(file_name, False)
191         elif not repository:
192             import_template = None
193             if self.path:
194                 if toscaparser.utils.urlutils.UrlUtils.validate_url(self.path):
195                     if os.path.isabs(file_name):
196                         msg = (_('Absolute file name "%(name)s" cannot be '
197                                  'used in a URL-based input template '
198                                  '"%(template)s".')
199                                % {'name': file_name, 'template': self.path})
200                         log.error(msg)
201                         ExceptionCollector.appendException(ImportError(msg))
202                         return None, None
203                     import_template = toscaparser.utils.urlutils.UrlUtils.\
204                         join_url(self.path, file_name)
205                     a_file = False
206                 else:
207                     a_file = True
208                     main_a_file = os.path.isfile(self.path)
209
210                     if main_a_file:
211                         if os.path.isfile(file_name):
212                             import_template = file_name
213                         else:
214                             full_path = os.path.join(
215                                 os.path.dirname(os.path.abspath(self.path)),
216                                 file_name)
217                             if os.path.isfile(full_path):
218                                 import_template = full_path
219                             else:
220                                 file_path = file_name.rpartition("/")
221                                 dir_path = os.path.dirname(os.path.abspath(
222                                     self.path))
223                                 if file_path[0] != '' and dir_path.endswith(
224                                     file_path[0]):
225                                         import_template = dir_path + "/" +\
226                                             file_path[2]
227                                         if not os.path.isfile(import_template):
228                                             msg = (_('"%(import_template)s" is'
229                                                      'not a valid file')
230                                                    % {'import_template':
231                                                       import_template})
232                                             log.error(msg)
233                                             ExceptionCollector.appendException
234                                             (ValueError(msg))
235             else:  # template is pre-parsed
236                 if os.path.isabs(file_name) and os.path.isfile(file_name):
237                     a_file = True
238                     import_template = file_name
239                 else:
240                     msg = (_('Relative file name "%(name)s" cannot be used '
241                              'in a pre-parsed input template.')
242                            % {'name': file_name})
243                     log.error(msg)
244                     ExceptionCollector.appendException(ImportError(msg))
245                     return None, None
246
247             if not import_template:
248                 log.error(_('Import "%(name)s" is not valid.') %
249                           {'name': import_uri_def})
250                 ExceptionCollector.appendException(
251                     ImportError(_('Import "%s" is not valid.') %
252                                 import_uri_def))
253                 return None, None
254             return import_template, YAML_LOADER(import_template, a_file)
255
256         if short_import_notation:
257             log.error(_('Import "%(name)s" is not valid.') % import_uri_def)
258             ExceptionCollector.appendException(
259                 ImportError(_('Import "%s" is not valid.') % import_uri_def))
260             return None, None
261
262         full_url = ""
263         if repository:
264             if self.repositories:
265                 for repo_name, repo_def in self.repositories.items():
266                     if repo_name == repository:
267                         # Remove leading, ending spaces and strip
268                         # the last character if "/"
269                         repo_url = ((repo_def['url']).strip()).rstrip("//")
270                         full_url = repo_url + "/" + file_name
271
272             if not full_url:
273                 msg = (_('referenced repository "%(n_uri)s" in import '
274                          'definition "%(tpl)s" not found.')
275                        % {'n_uri': repository, 'tpl': import_name})
276                 log.error(msg)
277                 ExceptionCollector.appendException(ImportError(msg))
278                 return None, None
279
280         if toscaparser.utils.urlutils.UrlUtils.validate_url(full_url):
281             return full_url, YAML_LOADER(full_url, False)
282         else:
283             msg = (_('repository url "%(n_uri)s" is not valid in import '
284                      'definition "%(tpl)s".')
285                    % {'n_uri': repo_url, 'tpl': import_name})
286             log.error(msg)
287             ExceptionCollector.appendException(ImportError(msg))