1 # Copyright 2015-2017 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Settings and configuration handlers.
17 Settings will be loaded from several .conf files
18 and any user provided settings file.
21 # pylint: disable=invalid-name
31 _LOGGER = logging.getLogger(__name__)
33 # Special test parameters which are not part of standard VSPERF configuration
34 _EXTRA_TEST_PARAMS = ['TUNNEL_TYPE']
36 # regex to parse configuration macros from 04_vnf.conf
37 # it will select all patterns starting with # sign
38 # and returns macro parameters and step
39 # examples of valid macros:
41 # #MAC(AA:BB:CC:DD:EE:FF) or #MAC(AA:BB:CC:DD:EE:FF,2)
42 # #IP(192.168.1.2) or #IP(192.168.1.2,2)
44 _PARSE_PATTERN = r'(#[A-Z]+)(\(([^(),]+)(,([0-9]+))?\))?'
46 class Settings(object):
47 """Holding class for settings.
52 def _eval_param(self, param):
53 # pylint: disable=invalid-name
54 """ Helper function for expansion of references to vsperf parameters
56 if isinstance(param, str):
57 # evaluate every #PARAM reference inside parameter itself
58 macros = re.findall(r'#PARAM\((([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)\)', param)
61 # pylint: disable=eval-used
63 tmp_val = str(eval("self.getValue('{}'){}".format(macro[1], macro[2])))
64 param = param.replace('#PARAM({})'.format(macro[0]), tmp_val)
65 # silently ignore that option required by PARAM macro can't be evaluated;
66 # It is possible, that referred parameter will be constructed during runtime
70 except AttributeError:
73 elif isinstance(param, (list, tuple)):
76 tmp_list.append(self._eval_param(item))
78 elif isinstance(param, dict):
80 for (key, value) in param.items():
81 tmp_dict[key] = self._eval_param(value)
86 def getValue(self, attr):
87 """Return a settings item value
89 if attr in self.__dict__:
90 if attr == 'TEST_PARAMS':
91 return getattr(self, attr)
93 master_value = getattr(self, attr)
94 # Check if parameter value was modified by CLI option
95 cli_value = get_test_param(attr, None)
96 if cli_value is not None:
97 # TRAFFIC dictionary is not overridden by CLI option
98 # but only updated by specified values
100 tmp_value = copy.deepcopy(master_value)
101 tmp_value = merge_spec(tmp_value, cli_value)
102 return self._eval_param(tmp_value)
104 return self._eval_param(cli_value)
106 return self._eval_param(master_value)
108 raise AttributeError("%r object has no attribute %r" %
109 (self.__class__, attr))
111 def __setattr__(self, name, value):
114 # skip non-settings. this should exclude built-ins amongst others
115 if not name.isupper():
118 # we can assume all uppercase keys are valid settings
119 super(Settings, self).__setattr__(name, value)
121 def setValue(self, name, value):
124 if name is not None and value is not None:
125 super(Settings, self).__setattr__(name, value)
127 def resetValue(self, attr):
128 """If parameter was overridden by TEST_PARAMS, then it will
129 be set to its original value.
131 if attr in self.__dict__['TEST_PARAMS']:
132 self.__dict__['TEST_PARAMS'].pop(attr)
134 def load_from_file(self, path):
135 """Update ``settings`` with values found in module at ``path``.
139 custom_settings = imp.load_source('custom_settings', path)
141 for key in dir(custom_settings):
142 if getattr(custom_settings, key) is not None:
143 setattr(self, key, getattr(custom_settings, key))
145 def load_from_dir(self, dir_path):
146 """Update ``settings`` with contents of the .conf files at ``path``.
148 Each file must be named Nfilename.conf, where N is a single or
149 multi-digit decimal number. The files are loaded in ascending order of
150 N - so if a configuration item exists in more that one file the setting
151 in the file with the largest value of N takes precedence.
153 :param dir_path: The full path to the dir from which to load the .conf
158 regex = re.compile("^(?P<digit_part>[0-9]+)(?P<alfa_part>[a-z]?)_.*.conf$")
160 def get_prefix(filename):
162 Provide a suitable function for sort's key arg
164 match_object = regex.search(os.path.basename(filename))
165 return [int(match_object.group('digit_part')),
166 match_object.group('alfa_part')]
168 # get full file path to all files & dirs in dir_path
169 file_paths = os.listdir(dir_path)
170 file_paths = [os.path.join(dir_path, x) for x in file_paths]
172 # filter to get only those that are a files, with a leading
173 # digit and end in '.conf'
174 file_paths = [x for x in file_paths if os.path.isfile(x) and
175 regex.search(os.path.basename(x))]
177 # sort ascending on the leading digits and afla (e.g. 03_, 05a_)
178 file_paths.sort(key=get_prefix)
180 # load settings from each file in turn
181 for filepath in file_paths:
182 self.load_from_file(filepath)
184 def load_from_dict(self, conf):
186 Update ``settings`` with values found in ``conf``.
188 Unlike the other loaders, this is case insensitive.
191 if conf[key] is not None:
192 if isinstance(conf[key], dict):
193 # recursively update dict items, e.g. TEST_PARAMS
194 setattr(self, key.upper(),
195 merge_spec(getattr(self, key.upper()), conf[key]))
197 setattr(self, key.upper(), conf[key])
199 def restore_from_dict(self, conf):
201 Restore ``settings`` with values found in ``conf``.
203 Method will drop all configuration options and restore their
204 values from conf dictionary
206 self.__dict__.clear()
207 tmp_conf = copy.deepcopy(conf)
209 self.setValue(key, tmp_conf[key])
211 def load_from_env(self):
213 Update ``settings`` with values found in the environment.
215 for key in os.environ:
216 setattr(self, key, os.environ[key])
218 def check_test_params(self):
220 Check all parameters defined inside TEST_PARAMS for their
221 existence. In case that non existing vsperf parmeter name
222 is detected, then VSPER will raise a runtime error.
225 for key in settings.getValue('TEST_PARAMS'):
226 if key == 'TEST_PARAMS':
227 raise RuntimeError('It is not allowed to define TEST_PARAMS '
228 'as a test parameter')
229 if key not in self.__dict__ and key not in _EXTRA_TEST_PARAMS:
230 unknown_keys.append(key)
233 raise RuntimeError('Test parameters contain unknown configuration '
234 'parameter(s): {}'.format(', '.join(unknown_keys)))
236 def check_vm_settings(self, vm_number):
238 Check all VM related settings starting with GUEST_ prefix.
239 If it is not available for defined number of VMs, then vsperf
240 will try to expand it automatically. Expansion is performed
241 also in case that first list item contains a macro.
243 for key in self.__dict__:
244 if key.startswith('GUEST_'):
245 value = self.getValue(key)
246 if isinstance(value, str) and str(value).find('#') >= 0:
247 self._expand_vm_settings(key, 1)
249 if isinstance(value, list):
250 if len(value) < vm_number or str(value[0]).find('#') >= 0:
251 # expand configuration for all VMs
252 self._expand_vm_settings(key, vm_number)
254 def _expand_vm_settings(self, key, vm_number):
256 Expand VM option with given key for given number of VMs
258 tmp_value = self.getValue(key)
259 if isinstance(tmp_value, str):
261 master_value = tmp_value
262 tmp_value = [tmp_value]
265 master_value = tmp_value[0]
267 master_value_str = str(master_value)
268 if master_value_str.find('#') >= 0:
269 self.__dict__[key] = []
270 for vmindex in range(vm_number):
271 value = master_value_str.replace('#VMINDEX', str(vmindex))
272 for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
273 multi = int(step) if step and int(step) else 1
275 # pylint: disable=eval-used
276 tmp_result = str(eval(param))
277 elif macro == '#MAC':
278 mac_value = netaddr.EUI(param).value
279 mac = netaddr.EUI(mac_value + vmindex * multi)
280 mac.dialect = netaddr.mac_unix_expanded
281 tmp_result = str(mac)
283 ip_value = netaddr.IPAddress(param).value
284 tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
286 raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))
288 value = value.replace("{}{}".format(macro, args), tmp_result)
290 # retype value to original type if needed
291 if not isinstance(master_value, str):
292 value = ast.literal_eval(value)
293 self.__dict__[key].append(value)
295 for vmindex in range(len(tmp_value), vm_number):
296 self.__dict__[key].append(master_value)
299 self.__dict__[key] = self.__dict__[key][0]
301 _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
304 """Provide settings as a human-readable string.
306 This can be useful for debug.
309 A human-readable string.
312 for key in self.__dict__:
313 tmp_dict[key] = self.getValue(key)
315 return pprint.pformat(tmp_dict)
318 # validation methods used by step driven testcases
320 def validate_getValue(self, result, attr):
321 """Verifies, that correct value was returned
323 # getValue must be called to expand macros and apply
324 # values from TEST_PARAM option
325 assert result == self.getValue(attr)
328 def validate_setValue(self, _dummy_result, name, value):
329 """Verifies, that value was correctly set
331 assert value == self.__dict__[name]
334 def validate_resetValue(self, _dummy_result, attr):
335 """Verifies, that value was correctly reset
337 return 'TEST_PARAMS' not in self.__dict__ or \
338 attr not in self.__dict__['TEST_PARAMS']
340 settings = Settings()
342 def get_test_param(key, default=None):
343 """Retrieve value for test param ``key`` if available.
345 :param key: Key to retrieve from test params.
346 :param default: Default to return if key not found.
348 :returns: Value for ``key`` if found, else ``default``.
350 test_params = settings.getValue('TEST_PARAMS')
351 return test_params.get(key, default) if test_params else default
353 def merge_spec(orig, new):
354 """Merges ``new`` dict with ``orig`` dict, and returns orig.
356 This takes into account nested dictionaries. Example:
358 >>> old = {'foo': 1, 'bar': {'foo': 2, 'bar': 3}}
359 >>> new = {'foo': 6, 'bar': {'foo': 7}}
360 >>> merge_spec(old, new)
361 {'foo': 6, 'bar': {'foo': 7, 'bar': 3}}
363 You'll notice that ``bar.bar`` is not removed. This is the desired result.
369 # Not allowing derived dictionary types for now
370 # pylint: disable=unidiomatic-typecheck
371 if type(orig[key]) == dict:
372 orig[key] = merge_spec(orig[key], new[key])