1 # Copyright 2015-2017 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Settings and configuration handlers.
17 Settings will be loaded from several .conf files
18 and any user provided settings file.
21 # pylint: disable=invalid-name
31 _LOGGER = logging.getLogger(__name__)
33 # Special test parameters which are not part of standard VSPERF configuration
34 _EXTRA_TEST_PARAMS = ['TUNNEL_TYPE']
36 # regex to parse configuration macros from 04_vnf.conf
37 # it will select all patterns starting with # sign
38 # and returns macro parameters and step
39 # examples of valid macros:
41 # #MAC(AA:BB:CC:DD:EE:FF) or #MAC(AA:BB:CC:DD:EE:FF,2)
42 # #IP(192.168.1.2) or #IP(192.168.1.2,2)
44 _PARSE_PATTERN = r'(#[A-Z]+)(\(([^(),]+)(,([0-9]+))?\))?'
46 class Settings(object):
47 """Holding class for settings.
52 def _eval_param(self, param):
53 # pylint: disable=invalid-name
54 """ Helper function for expansion of references to vsperf parameters
56 if isinstance(param, str):
57 # evaluate every #PARAM reference inside parameter itself
58 macros = re.findall(r'#PARAM\((([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)\)', param)
61 # pylint: disable=eval-used
63 tmp_val = str(eval("self.getValue('{}'){}".format(macro[1], macro[2])))
64 param = param.replace('#PARAM({})'.format(macro[0]), tmp_val)
65 # silently ignore that option required by PARAM macro can't be evaluated;
66 # It is possible, that referred parameter will be constructed during runtime
70 except AttributeError:
73 elif isinstance(param, list) or isinstance(param, tuple):
76 tmp_list.append(self._eval_param(item))
78 elif isinstance(param, dict):
80 for (key, value) in param.items():
81 tmp_dict[key] = self._eval_param(value)
86 def getValue(self, attr):
87 """Return a settings item value
89 if attr in self.__dict__:
90 if attr == 'TEST_PARAMS':
91 return getattr(self, attr)
93 master_value = getattr(self, attr)
94 # Check if parameter value was modified by CLI option
95 cli_value = get_test_param(attr, None)
96 if cli_value is not None:
97 # TRAFFIC dictionary is not overridden by CLI option
98 # but only updated by specified values
100 tmp_value = copy.deepcopy(master_value)
101 tmp_value = merge_spec(tmp_value, cli_value)
102 return self._eval_param(tmp_value)
104 return self._eval_param(cli_value)
106 return self._eval_param(master_value)
108 raise AttributeError("%r object has no attribute %r" %
109 (self.__class__, attr))
111 def __setattr__(self, name, value):
114 # skip non-settings. this should exclude built-ins amongst others
115 if not name.isupper():
118 # we can assume all uppercase keys are valid settings
119 super(Settings, self).__setattr__(name, value)
121 def setValue(self, name, value):
124 if name is not None and value is not None:
125 super(Settings, self).__setattr__(name, value)
127 def load_from_file(self, path):
128 """Update ``settings`` with values found in module at ``path``.
132 custom_settings = imp.load_source('custom_settings', path)
134 for key in dir(custom_settings):
135 if getattr(custom_settings, key) is not None:
136 setattr(self, key, getattr(custom_settings, key))
138 def load_from_dir(self, dir_path):
139 """Update ``settings`` with contents of the .conf files at ``path``.
141 Each file must be named Nfilename.conf, where N is a single or
142 multi-digit decimal number. The files are loaded in ascending order of
143 N - so if a configuration item exists in more that one file the setting
144 in the file with the largest value of N takes precedence.
146 :param dir_path: The full path to the dir from which to load the .conf
151 regex = re.compile("^(?P<digit_part>[0-9]+)(?P<alfa_part>[a-z]?)_.*.conf$")
153 def get_prefix(filename):
155 Provide a suitable function for sort's key arg
157 match_object = regex.search(os.path.basename(filename))
158 return [int(match_object.group('digit_part')),
159 match_object.group('alfa_part')]
161 # get full file path to all files & dirs in dir_path
162 file_paths = os.listdir(dir_path)
163 file_paths = [os.path.join(dir_path, x) for x in file_paths]
165 # filter to get only those that are a files, with a leading
166 # digit and end in '.conf'
167 file_paths = [x for x in file_paths if os.path.isfile(x) and
168 regex.search(os.path.basename(x))]
170 # sort ascending on the leading digits and afla (e.g. 03_, 05a_)
171 file_paths.sort(key=get_prefix)
173 # load settings from each file in turn
174 for filepath in file_paths:
175 self.load_from_file(filepath)
177 def load_from_dict(self, conf):
179 Update ``settings`` with values found in ``conf``.
181 Unlike the other loaders, this is case insensitive.
184 if conf[key] is not None:
185 if isinstance(conf[key], dict):
186 # recursively update dict items, e.g. TEST_PARAMS
187 setattr(self, key.upper(),
188 merge_spec(getattr(self, key.upper()), conf[key]))
190 setattr(self, key.upper(), conf[key])
192 def restore_from_dict(self, conf):
194 Restore ``settings`` with values found in ``conf``.
196 Method will drop all configuration options and restore their
197 values from conf dictionary
199 self.__dict__.clear()
200 tmp_conf = copy.deepcopy(conf)
202 self.setValue(key, tmp_conf[key])
204 def load_from_env(self):
206 Update ``settings`` with values found in the environment.
208 for key in os.environ:
209 setattr(self, key, os.environ[key])
211 def check_test_params(self):
213 Check all parameters defined inside TEST_PARAMS for their
214 existence. In case that non existing vsperf parmeter name
215 is detected, then VSPER will raise a runtime error.
218 for key in settings.getValue('TEST_PARAMS'):
219 if key == 'TEST_PARAMS':
220 raise RuntimeError('It is not allowed to define TEST_PARAMS '
221 'as a test parameter')
222 if key not in self.__dict__ and key not in _EXTRA_TEST_PARAMS:
223 unknown_keys.append(key)
225 if len(unknown_keys):
226 raise RuntimeError('Test parameters contain unknown configuration '
227 'parameter(s): {}'.format(', '.join(unknown_keys)))
229 def check_vm_settings(self, vm_number):
231 Check all VM related settings starting with GUEST_ prefix.
232 If it is not available for defined number of VMs, then vsperf
233 will try to expand it automatically. Expansion is performed
234 also in case that first list item contains a macro.
236 for key in self.__dict__:
237 if key.startswith('GUEST_'):
238 value = self.getValue(key)
239 if isinstance(value, str) and str(value).find('#') >= 0:
240 self._expand_vm_settings(key, 1)
242 if isinstance(value, list):
243 if len(value) < vm_number or str(value[0]).find('#') >= 0:
244 # expand configuration for all VMs
245 self._expand_vm_settings(key, vm_number)
247 def _expand_vm_settings(self, key, vm_number):
249 Expand VM option with given key for given number of VMs
251 tmp_value = self.getValue(key)
252 if isinstance(tmp_value, str):
254 master_value = tmp_value
255 tmp_value = [tmp_value]
258 master_value = tmp_value[0]
260 master_value_str = str(master_value)
261 if master_value_str.find('#') >= 0:
262 self.__dict__[key] = []
263 for vmindex in range(vm_number):
264 value = master_value_str.replace('#VMINDEX', str(vmindex))
265 for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
266 multi = int(step) if len(step) and int(step) else 1
268 # pylint: disable=eval-used
269 tmp_result = str(eval(param))
270 elif macro == '#MAC':
271 mac_value = netaddr.EUI(param).value
272 mac = netaddr.EUI(mac_value + vmindex * multi)
273 mac.dialect = netaddr.mac_unix_expanded
274 tmp_result = str(mac)
276 ip_value = netaddr.IPAddress(param).value
277 tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
279 raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))
281 value = value.replace("{}{}".format(macro, args), tmp_result)
283 # retype value to original type if needed
284 if not isinstance(master_value, str):
285 value = ast.literal_eval(value)
286 self.__dict__[key].append(value)
288 for vmindex in range(len(tmp_value), vm_number):
289 self.__dict__[key].append(master_value)
292 self.__dict__[key] = self.__dict__[key][0]
294 _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
297 """Provide settings as a human-readable string.
299 This can be useful for debug.
302 A human-readable string.
305 for key in self.__dict__:
306 tmp_dict[key] = self.getValue(key)
308 return pprint.pformat(tmp_dict)
311 # validation methods used by step driven testcases
313 def validate_getValue(self, result, attr):
314 """Verifies, that correct value was returned
316 # getValue must be called to expand macros and apply
317 # values from TEST_PARAM option
318 assert result == self.getValue(attr)
321 def validate_setValue(self, dummy_result, name, value):
322 """Verifies, that value was correctly set
324 assert value == self.__dict__[name]
327 settings = Settings()
329 def get_test_param(key, default=None):
330 """Retrieve value for test param ``key`` if available.
332 :param key: Key to retrieve from test params.
333 :param default: Default to return if key not found.
335 :returns: Value for ``key`` if found, else ``default``.
337 test_params = settings.getValue('TEST_PARAMS')
338 return test_params.get(key, default) if test_params else default
340 def merge_spec(orig, new):
341 """Merges ``new`` dict with ``orig`` dict, and returns orig.
343 This takes into account nested dictionaries. Example:
345 >>> old = {'foo': 1, 'bar': {'foo': 2, 'bar': 3}}
346 >>> new = {'foo': 6, 'bar': {'foo': 7}}
347 >>> merge_spec(old, new)
348 {'foo': 6, 'bar': {'foo': 7, 'bar': 3}}
350 You'll notice that ``bar.bar`` is not removed. This is the desired result.
356 # Not allowing derived dictionary types for now
357 # pylint: disable=unidiomatic-typecheck
358 if type(orig[key]) == dict:
359 orig[key] = merge_spec(orig[key], new[key])