1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
2 # not use this file except in compliance with the License. You may obtain
3 # a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 # License for the specific language governing permissions and limitations
21 from toscaparser.common.exception import ExceptionCollector
22 from toscaparser.common.exception import URLException
23 from toscaparser.common.exception import ValidationError
24 from toscaparser.imports import ImportsLoader
25 from toscaparser.utils.gettextutils import _
26 from toscaparser.utils.urlutils import UrlUtils
29 from BytesIO import BytesIO
30 except ImportError: # Python 3.x
31 from io import BytesIO
36 def __init__(self, csar_file, a_file=True):
39 self.is_validated = False
40 self.error_caught = False
45 """Validate the provided CSAR file."""
47 self.is_validated = True
49 # validate that the file or URL exists
50 missing_err_msg = (_('"%s" does not exist.') % self.path)
52 if not os.path.isfile(self.path):
53 ExceptionCollector.appendException(
54 ValidationError(message=missing_err_msg))
59 if not UrlUtils.validate_url(self.path):
60 ExceptionCollector.appendException(
61 ValidationError(message=missing_err_msg))
64 response = requests.get(self.path)
65 self.csar = BytesIO(response.content)
67 # validate that it is a valid zip file
68 if not zipfile.is_zipfile(self.csar):
69 err_msg = (_('"%s" is not a valid zip file.') % self.path)
70 ExceptionCollector.appendException(
71 ValidationError(message=err_msg))
74 # validate that it contains the metadata file in the correct location
75 self.zfile = zipfile.ZipFile(self.csar, 'r')
76 filelist = self.zfile.namelist()
77 if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
78 err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
79 'required file "TOSCA.meta" in the folder '
80 '"TOSCA-Metadata".') % self.path)
81 ExceptionCollector.appendException(
82 ValidationError(message=err_msg))
85 # validate that 'Entry-Definitions' property exists in TOSCA.meta
86 data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
87 invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
88 'the CSAR "%s" does not contain valid YAML '
89 'content.') % self.path)
91 meta = yaml.load(data)
92 if type(meta) is dict:
95 ExceptionCollector.appendException(
96 ValidationError(message=invalid_yaml_err_msg))
98 except yaml.YAMLError:
99 ExceptionCollector.appendException(
100 ValidationError(message=invalid_yaml_err_msg))
103 if 'Entry-Definitions' not in self.metadata:
104 err_msg = (_('The CSAR "%s" is missing the required metadata '
105 '"Entry-Definitions" in '
106 '"TOSCA-Metadata/TOSCA.meta".')
108 ExceptionCollector.appendException(
109 ValidationError(message=err_msg))
112 # validate that 'Entry-Definitions' metadata value points to an
113 # existing file in the CSAR
114 entry = self.metadata.get('Entry-Definitions')
115 if entry and entry not in filelist:
116 err_msg = (_('The "Entry-Definitions" file defined in the '
117 'CSAR "%s" does not exist.') % self.path)
118 ExceptionCollector.appendException(
119 ValidationError(message=err_msg))
122 # validate that external references in the main template actually
123 # exist and are accessible
124 self._validate_external_references()
125 return not self.error_caught
127 def get_metadata(self):
128 """Return the metadata dictionary."""
130 # validate the csar if not already validated
131 if not self.is_validated:
134 # return a copy to avoid changes overwrite the original
135 return dict(self.metadata) if self.metadata else None
137 def _get_metadata(self, key):
138 if not self.is_validated:
140 return self.metadata.get(key)
142 def get_author(self):
143 return self._get_metadata('Created-By')
145 def get_version(self):
146 return self._get_metadata('CSAR-Version')
148 def get_main_template(self):
149 entry_def = self._get_metadata('Entry-Definitions')
150 if entry_def in self.zfile.namelist():
153 def get_main_template_yaml(self):
154 main_template = self.get_main_template()
156 data = self.zfile.read(main_template)
157 invalid_tosca_yaml_err_msg = (
158 _('The file "%(template)s" in the CSAR "%(csar)s" does not '
159 'contain valid TOSCA YAML content.') %
160 {'template': main_template, 'csar': self.path})
162 tosca_yaml = yaml.load(data)
163 if type(tosca_yaml) is not dict:
164 ExceptionCollector.appendException(
165 ValidationError(message=invalid_tosca_yaml_err_msg))
168 ExceptionCollector.appendException(
169 ValidationError(message=invalid_tosca_yaml_err_msg))
171 def get_description(self):
172 desc = self._get_metadata('Description')
176 self.metadata['Description'] = \
177 self.get_main_template_yaml().get('description')
178 return self.metadata['Description']
180 def decompress(self):
181 if not self.is_validated:
183 self.temp_dir = tempfile.NamedTemporaryFile().name
184 with zipfile.ZipFile(self.csar, "r") as zf:
185 zf.extractall(self.temp_dir)
187 def _validate_external_references(self):
188 """Extracts files referenced in the main template
190 These references are currently supported:
192 * interface implementations
197 main_tpl_file = self.get_main_template()
198 if not main_tpl_file:
200 main_tpl = self.get_main_template_yaml()
202 if 'imports' in main_tpl:
203 ImportsLoader(main_tpl['imports'],
204 os.path.join(self.temp_dir, main_tpl_file))
206 if 'topology_template' in main_tpl:
207 topology_template = main_tpl['topology_template']
209 if 'node_templates' in topology_template:
210 node_templates = topology_template['node_templates']
212 for node_template_key in node_templates:
213 node_template = node_templates[node_template_key]
214 if 'artifacts' in node_template:
215 artifacts = node_template['artifacts']
216 for artifact_key in artifacts:
217 artifact = artifacts[artifact_key]
218 if isinstance(artifact, six.string_types):
219 self._validate_external_reference(
222 elif isinstance(artifact, dict):
223 if 'file' in artifact:
224 self._validate_external_reference(
228 ExceptionCollector.appendException(
229 ValueError(_('Unexpected artifact '
230 'definition for "%s".')
232 self.error_caught = True
233 if 'interfaces' in node_template:
234 interfaces = node_template['interfaces']
235 for interface_key in interfaces:
236 interface = interfaces[interface_key]
237 for opertation_key in interface:
238 operation = interface[opertation_key]
239 if isinstance(operation, six.string_types):
240 self._validate_external_reference(
244 elif isinstance(operation, dict):
245 if 'implementation' in operation:
246 self._validate_external_reference(
248 operation['implementation'])
251 shutil.rmtree(self.temp_dir)
253 def _validate_external_reference(self, tpl_file, resource_file,
255 """Verify that the external resource exists
257 If resource_file is a URL verify that the URL is valid.
258 If resource_file is a relative path verify that the path is valid
259 considering base folder (self.temp_dir) and tpl_file.
260 Note that in a CSAR resource_file cannot be an absolute path.
262 if UrlUtils.validate_url(resource_file):
263 msg = (_('The resource at "%s" cannot be accessed.') %
266 if UrlUtils.url_accessible(resource_file):
269 ExceptionCollector.appendException(
270 URLException(what=msg))
271 self.error_caught = True
273 ExceptionCollector.appendException(
274 URLException(what=msg))
275 self.error_caught = True
277 if os.path.isfile(os.path.join(self.temp_dir,
278 os.path.dirname(tpl_file),
283 ExceptionCollector.appendException(
284 ValueError(_('The resource "%s" does not exist.')
286 self.error_caught = True