Patch to make flavor configuration for tests more flexible.
[snaps.git] / snaps / file_utils.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import os
17 import logging
18
19 from cryptography.hazmat.primitives import serialization
20
21 try:
22     import urllib.request as urllib
23 except ImportError:
24     import urllib2 as urllib
25
26 import yaml
27
28 __author__ = 'spisarski'
29
30 """
31 Utilities for basic file handling
32 """
33
34 logger = logging.getLogger('file_utils')
35
36
37 def file_exists(file_path):
38     """
39     Returns True if the image file already exists and throws an exception if
40     the path is a directory
41     :return:
42     """
43     expanded_path = os.path.expanduser(file_path)
44     if os.path.exists(expanded_path):
45         if os.path.isdir(expanded_path):
46             return False
47         return os.path.isfile(expanded_path)
48     return False
49
50
51 def download(url, dest_path, name=None):
52     """
53     Download a file to a destination path given a URL
54     :param url: the endpoint to the file to download
55     :param dest_path: the directory to save the file
56     :param name: the file name (optional)
57     :rtype : File object
58     """
59     if not name:
60         name = url.rsplit('/')[-1]
61     dest = dest_path + '/' + name
62     logger.debug('Downloading file from - ' + url)
63     # Override proxy settings to use localhost to download file
64     download_file = None
65
66     if not os.path.isdir(dest_path):
67         try:
68             os.mkdir(dest_path)
69         except:
70             raise
71     try:
72         with open(dest, 'wb') as download_file:
73             logger.debug('Saving file to - %s',
74                          os.path.abspath(download_file.name))
75             response = __get_url_response(url)
76             download_file.write(response.read())
77         return download_file
78     finally:
79         if download_file:
80             download_file.close()
81
82
83 def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
84     """
85     Saves the generated RSA generated keys to the filesystem
86     :param keys: the keys to save generated by cryptography
87     :param pub_file_path: the path to the public keys
88     :param priv_file_path: the path to the private keys
89     """
90     if keys:
91         if pub_file_path:
92             # To support '~'
93             pub_expand_file = os.path.expanduser(pub_file_path)
94             pub_dir = os.path.dirname(pub_expand_file)
95
96             if not os.path.isdir(pub_dir):
97                 os.mkdir(pub_dir)
98
99             public_handle = None
100             try:
101                 public_handle = open(pub_expand_file, 'wb')
102                 public_bytes = keys.public_key().public_bytes(
103                     serialization.Encoding.OpenSSH,
104                     serialization.PublicFormat.OpenSSH)
105                 public_handle.write(public_bytes)
106             finally:
107                 if public_handle:
108                     public_handle.close()
109
110             os.chmod(pub_expand_file, 0o400)
111             logger.info("Saved public key to - " + pub_expand_file)
112         if priv_file_path:
113             # To support '~'
114             priv_expand_file = os.path.expanduser(priv_file_path)
115             priv_dir = os.path.dirname(priv_expand_file)
116             if not os.path.isdir(priv_dir):
117                 os.mkdir(priv_dir)
118
119             private_handle = None
120             try:
121                 private_handle = open(priv_expand_file, 'wb')
122                 private_handle.write(
123                     keys.private_bytes(
124                         encoding=serialization.Encoding.PEM,
125                         format=serialization.PrivateFormat.TraditionalOpenSSL,
126                         encryption_algorithm=serialization.NoEncryption()))
127             finally:
128                 if private_handle:
129                     private_handle.close()
130
131             os.chmod(priv_expand_file, 0o400)
132             logger.info("Saved private key to - " + priv_expand_file)
133
134
135 def save_string_to_file(string, file_path, mode=None):
136     """
137     Stores
138     :param string: the string contents to store
139     :param file_path: the file path to create
140     :param mode: the file's mode
141     :return: the file object
142     """
143     save_file = open(file_path, 'w')
144     try:
145         save_file.write(string)
146         if mode:
147             os.chmod(file_path, mode)
148         return save_file
149     finally:
150         save_file.close()
151
152
153 def get_content_length(url):
154     """
155     Returns the number of bytes to be downloaded from the given URL
156     :param url: the URL to inspect
157     :return: the number of bytes
158     """
159     response = __get_url_response(url)
160     return response.headers['Content-Length']
161
162
163 def __get_url_response(url):
164     """
165     Returns a response object for a given URL
166     :param url: the URL
167     :return: the response
168     """
169     proxy_handler = urllib.ProxyHandler({})
170     opener = urllib.build_opener(proxy_handler)
171     urllib.install_opener(opener)
172     return urllib.urlopen(url)
173
174
175 def read_yaml(config_file_path):
176     """
177     Reads the yaml file and returns a dictionary object representation
178     :param config_file_path: The file path to config
179     :return: a dictionary
180     """
181     logger.debug('Attempting to load configuration file - ' + config_file_path)
182     config_file = None
183     try:
184         with open(config_file_path, 'r') as config_file:
185             config = yaml.safe_load(config_file)
186             logger.info('Loaded configuration')
187         return config
188     finally:
189         if config_file:
190             logger.info('Closing configuration file')
191             config_file.close()
192
193
194 def persist_dict_to_yaml(the_dict, file_name):
195     """
196     Creates a YAML file from a dict
197     :param the_dict: the dictionary to store
198     :param conf_dir: the directory used to store the config file
199     :return: the file object
200     """
201     logger.info('Persisting %s to [%s]', the_dict, file_name)
202     file_path = os.path.expanduser(file_name)
203     yaml_from_dict = yaml.dump(
204         the_dict, default_flow_style=False, default_style='')
205     return save_string_to_file(yaml_from_dict, file_path)
206
207
208 def read_os_env_file(os_env_filename):
209     """
210     Reads the OS environment source file and returns a map of each key/value
211     Will ignore lines beginning with a '#' and will replace any single or
212     double quotes contained within the value
213     :param os_env_filename: The name of the OS environment file to read
214     :return: a dictionary
215     """
216     if os_env_filename:
217         logger.info('Attempting to read OS environment file - %s',
218                     os_env_filename)
219         out = {}
220         env_file = None
221         try:
222             env_file = open(os_env_filename)
223             for line in env_file:
224                 line = line.lstrip()
225                 if not line.startswith('#') and line.startswith('export '):
226                     line = line.lstrip('export ').strip()
227                     tokens = line.split('=')
228                     if len(tokens) > 1:
229                         # Remove leading and trailing ' & " characters from
230                         # value
231                         out[tokens[0]] = tokens[1].lstrip('\'').lstrip('\"').rstrip('\'').rstrip('\"')
232         finally:
233             if env_file:
234                 env_file.close()
235         return out
236
237
238 def read_file(filename):
239     """
240     Returns the contents of a file as a string
241     :param filename: the name of the file
242     :return:
243     """
244     out = str()
245     the_file = None
246     try:
247         the_file = open(filename)
248         for line in the_file:
249             out += line
250         return out
251     finally:
252         if the_file:
253             the_file.close()