fix error when run unittest case
[parser.git] / tosca2heat / tosca-parser / toscaparser / prereq / csar.py
1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
2 #    not use this file except in compliance with the License. You may obtain
3 #    a copy of the License at
4 #
5 #         http://www.apache.org/licenses/LICENSE-2.0
6 #
7 #    Unless required by applicable law or agreed to in writing, software
8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 #    License for the specific language governing permissions and limitations
11 #    under the License.
12
13 import os.path
14 import requests
15 import shutil
16 import six
17 import tempfile
18 import yaml
19 import zipfile
20
21 from toscaparser.common.exception import ExceptionCollector
22 from toscaparser.common.exception import URLException
23 from toscaparser.common.exception import ValidationError
24 from toscaparser.imports import ImportsLoader
25 from toscaparser.utils.gettextutils import _
26 from toscaparser.utils.urlutils import UrlUtils
27
28 try:  # Python 2.x
29     from BytesIO import BytesIO
30 except ImportError:  # Python 3.x
31     from io import BytesIO
32
33
34 class CSAR(object):
35
36     def __init__(self, csar_file, a_file=True):
37         self.path = csar_file
38         self.a_file = a_file
39         self.is_validated = False
40         self.error_caught = False
41         self.csar = None
42         self.temp_dir = None
43
44     def validate(self):
45         """Validate the provided CSAR file."""
46
47         self.is_validated = True
48
49         # validate that the file or URL exists
50         missing_err_msg = (_('"%s" does not exist.') % self.path)
51         if self.a_file:
52             if not os.path.isfile(self.path):
53                 ExceptionCollector.appendException(
54                     ValidationError(message=missing_err_msg))
55                 return False
56             else:
57                 self.csar = self.path
58         else:  # a URL
59             if not UrlUtils.validate_url(self.path):
60                 ExceptionCollector.appendException(
61                     ValidationError(message=missing_err_msg))
62                 return False
63             else:
64                 response = requests.get(self.path)
65                 self.csar = BytesIO(response.content)
66
67         # validate that it is a valid zip file
68         if not zipfile.is_zipfile(self.csar):
69             err_msg = (_('"%s" is not a valid zip file.') % self.path)
70             ExceptionCollector.appendException(
71                 ValidationError(message=err_msg))
72             return False
73
74         # validate that it contains the metadata file in the correct location
75         self.zfile = zipfile.ZipFile(self.csar, 'r')
76         filelist = self.zfile.namelist()
77         if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
78             err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
79                          'required file "TOSCA.meta" in the folder '
80                          '"TOSCA-Metadata".') % self.path)
81             ExceptionCollector.appendException(
82                 ValidationError(message=err_msg))
83             return False
84
85         # validate that 'Entry-Definitions' property exists in TOSCA.meta
86         data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
87         invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
88                                   'the CSAR "%s" does not contain valid YAML '
89                                   'content.') % self.path)
90         try:
91             meta = yaml.load(data)
92             if type(meta) is dict:
93                 self.metadata = meta
94             else:
95                 ExceptionCollector.appendException(
96                     ValidationError(message=invalid_yaml_err_msg))
97                 return False
98         except yaml.YAMLError:
99             ExceptionCollector.appendException(
100                 ValidationError(message=invalid_yaml_err_msg))
101             return False
102
103         if 'Entry-Definitions' not in self.metadata:
104             err_msg = (_('The CSAR "%s" is missing the required metadata '
105                          '"Entry-Definitions" in '
106                          '"TOSCA-Metadata/TOSCA.meta".')
107                        % self.path)
108             ExceptionCollector.appendException(
109                 ValidationError(message=err_msg))
110             return False
111
112         # validate that 'Entry-Definitions' metadata value points to an
113         # existing file in the CSAR
114         entry = self.metadata.get('Entry-Definitions')
115         if entry and entry not in filelist:
116             err_msg = (_('The "Entry-Definitions" file defined in the '
117                          'CSAR "%s" does not exist.') % self.path)
118             ExceptionCollector.appendException(
119                 ValidationError(message=err_msg))
120             return False
121
122         # validate that external references in the main template actually
123         # exist and are accessible
124         self._validate_external_references()
125         return not self.error_caught
126
127     def get_metadata(self):
128         """Return the metadata dictionary."""
129
130         # validate the csar if not already validated
131         if not self.is_validated:
132             self.validate()
133
134         # return a copy to avoid changes overwrite the original
135         return dict(self.metadata) if self.metadata else None
136
137     def _get_metadata(self, key):
138         if not self.is_validated:
139             self.validate()
140         return self.metadata.get(key)
141
142     def get_author(self):
143         return self._get_metadata('Created-By')
144
145     def get_version(self):
146         return self._get_metadata('CSAR-Version')
147
148     def get_main_template(self):
149         entry_def = self._get_metadata('Entry-Definitions')
150         if entry_def in self.zfile.namelist():
151             return entry_def
152
153     def get_main_template_yaml(self):
154         main_template = self.get_main_template()
155         if main_template:
156             data = self.zfile.read(main_template)
157             invalid_tosca_yaml_err_msg = (
158                 _('The file "%(template)s" in the CSAR "%(csar)s" does not '
159                   'contain valid TOSCA YAML content.') %
160                 {'template': main_template, 'csar': self.path})
161             try:
162                 tosca_yaml = yaml.load(data)
163                 if type(tosca_yaml) is not dict:
164                     ExceptionCollector.appendException(
165                         ValidationError(message=invalid_tosca_yaml_err_msg))
166                 return tosca_yaml
167             except Exception:
168                 ExceptionCollector.appendException(
169                     ValidationError(message=invalid_tosca_yaml_err_msg))
170
171     def get_description(self):
172         desc = self._get_metadata('Description')
173         if desc is not None:
174             return desc
175
176         self.metadata['Description'] = \
177             self.get_main_template_yaml().get('description')
178         return self.metadata['Description']
179
180     def decompress(self):
181         if not self.is_validated:
182             self.validate()
183         self.temp_dir = tempfile.NamedTemporaryFile().name
184         with zipfile.ZipFile(self.csar, "r") as zf:
185             zf.extractall(self.temp_dir)
186
187     def _validate_external_references(self):
188         """Extracts files referenced in the main template
189
190         These references are currently supported:
191         * imports
192         * interface implementations
193         * artifacts
194         """
195         try:
196             self.decompress()
197             main_tpl_file = self.get_main_template()
198             if not main_tpl_file:
199                 return
200             main_tpl = self.get_main_template_yaml()
201
202             if 'imports' in main_tpl:
203                 ImportsLoader(main_tpl['imports'],
204                               os.path.join(self.temp_dir, main_tpl_file))
205
206             if 'topology_template' in main_tpl:
207                 topology_template = main_tpl['topology_template']
208
209                 if 'node_templates' in topology_template:
210                     node_templates = topology_template['node_templates']
211
212                     for node_template_key in node_templates:
213                         node_template = node_templates[node_template_key]
214                         if 'artifacts' in node_template:
215                             artifacts = node_template['artifacts']
216                             for artifact_key in artifacts:
217                                 artifact = artifacts[artifact_key]
218                                 if isinstance(artifact, six.string_types):
219                                     self._validate_external_reference(
220                                         main_tpl_file,
221                                         artifact)
222                                 elif isinstance(artifact, dict):
223                                     if 'file' in artifact:
224                                         self._validate_external_reference(
225                                             main_tpl_file,
226                                             artifact['file'])
227                                 else:
228                                     ExceptionCollector.appendException(
229                                         ValueError(_('Unexpected artifact '
230                                                      'definition for "%s".')
231                                                    % artifact_key))
232                                     self.error_caught = True
233                         if 'interfaces' in node_template:
234                             interfaces = node_template['interfaces']
235                             for interface_key in interfaces:
236                                 interface = interfaces[interface_key]
237                                 for opertation_key in interface:
238                                     operation = interface[opertation_key]
239                                     if isinstance(operation, six.string_types):
240                                         self._validate_external_reference(
241                                             main_tpl_file,
242                                             operation,
243                                             False)
244                                     elif isinstance(operation, dict):
245                                         if 'implementation' in operation:
246                                             self._validate_external_reference(
247                                                 main_tpl_file,
248                                                 operation['implementation'])
249         finally:
250             if self.temp_dir:
251                 shutil.rmtree(self.temp_dir)
252
253     def _validate_external_reference(self, tpl_file, resource_file,
254                                      raise_exc=True):
255         """Verify that the external resource exists
256
257         If resource_file is a URL verify that the URL is valid.
258         If resource_file is a relative path verify that the path is valid
259         considering base folder (self.temp_dir) and tpl_file.
260         Note that in a CSAR resource_file cannot be an absolute path.
261         """
262         if UrlUtils.validate_url(resource_file):
263             msg = (_('The resource at "%s" cannot be accessed.') %
264                    resource_file)
265             try:
266                 if UrlUtils.url_accessible(resource_file):
267                     return
268                 else:
269                     ExceptionCollector.appendException(
270                         URLException(what=msg))
271                     self.error_caught = True
272             except Exception:
273                 ExceptionCollector.appendException(
274                     URLException(what=msg))
275                 self.error_caught = True
276
277         if os.path.isfile(os.path.join(self.temp_dir,
278                                        os.path.dirname(tpl_file),
279                                        resource_file)):
280             return
281
282         if raise_exc:
283             ExceptionCollector.appendException(
284                 ValueError(_('The resource "%s" does not exist.')
285                            % resource_file))
286             self.error_caught = True