1 # Copyright 2015-2017 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Settings and configuration handlers.
17 Settings will be loaded from several .conf files
18 and any user provided settings file.
21 # pylint: disable=invalid-name
31 _LOGGER = logging.getLogger(__name__)
33 # Special test parameters which are not part of standard VSPERF configuration
34 _EXTRA_TEST_PARAMS = ['TUNNEL_TYPE']
36 # regex to parse configuration macros from 04_vnf.conf
37 # it will select all patterns starting with # sign
38 # and returns macro parameters and step
39 # examples of valid macros:
41 # #MAC(AA:BB:CC:DD:EE:FF) or #MAC(AA:BB:CC:DD:EE:FF,2)
42 # #IP(192.168.1.2) or #IP(192.168.1.2,2)
44 _PARSE_PATTERN = r'(#[A-Z]+)(\(([^(),]+)(,([0-9]+))?\))?'
46 class Settings(object):
47 """Holding class for settings.
52 def _eval_param(self, param):
53 # pylint: disable=invalid-name
54 """ Helper function for expansion of references to vsperf parameters
56 if isinstance(param, str):
57 # evaluate every #PARAM reference inside parameter itself
58 macros = re.findall(r'#PARAM\((([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)\)', param)
61 # pylint: disable=eval-used
63 tmp_val = str(eval("self.getValue('{}'){}".format(macro[1], macro[2])))
64 param = param.replace('#PARAM({})'.format(macro[0]), tmp_val)
65 # silently ignore that option required by PARAM macro can't be evaluated;
66 # It is possible, that referred parameter will be constructed during runtime
70 except AttributeError:
73 elif isinstance(param, list) or isinstance(param, tuple):
76 tmp_list.append(self._eval_param(item))
78 elif isinstance(param, dict):
80 for (key, value) in param.items():
81 tmp_dict[key] = self._eval_param(value)
86 def getValue(self, attr):
87 """Return a settings item value
89 if attr in self.__dict__:
90 if attr == 'TEST_PARAMS':
91 return getattr(self, attr)
93 master_value = getattr(self, attr)
94 # Check if parameter value was modified by CLI option
95 cli_value = get_test_param(attr, None)
96 if cli_value is not None:
97 # TRAFFIC dictionary is not overridden by CLI option
98 # but only updated by specified values
100 tmp_value = copy.deepcopy(master_value)
101 tmp_value = merge_spec(tmp_value, cli_value)
102 return self._eval_param(tmp_value)
104 return self._eval_param(cli_value)
106 return self._eval_param(master_value)
108 raise AttributeError("%r object has no attribute %r" %
109 (self.__class__, attr))
111 def __setattr__(self, name, value):
114 # skip non-settings. this should exclude built-ins amongst others
115 if not name.isupper():
118 # we can assume all uppercase keys are valid settings
119 super(Settings, self).__setattr__(name, value)
121 def setValue(self, name, value):
124 if name is not None and value is not None:
125 super(Settings, self).__setattr__(name, value)
127 def load_from_file(self, path):
128 """Update ``settings`` with values found in module at ``path``.
132 custom_settings = imp.load_source('custom_settings', path)
134 for key in dir(custom_settings):
135 if getattr(custom_settings, key) is not None:
136 setattr(self, key, getattr(custom_settings, key))
138 def load_from_dir(self, dir_path):
139 """Update ``settings`` with contents of the .conf files at ``path``.
141 Each file must be named Nfilename.conf, where N is a single or
142 multi-digit decimal number. The files are loaded in ascending order of
143 N - so if a configuration item exists in more that one file the setting
144 in the file with the largest value of N takes precedence.
146 :param dir_path: The full path to the dir from which to load the .conf
151 regex = re.compile("^(?P<digit_part>[0-9]+)(?P<alfa_part>[a-z]?)_.*.conf$")
153 def get_prefix(filename):
155 Provide a suitable function for sort's key arg
157 match_object = regex.search(os.path.basename(filename))
158 return [int(match_object.group('digit_part')),
159 match_object.group('alfa_part')]
161 # get full file path to all files & dirs in dir_path
162 file_paths = os.listdir(dir_path)
163 file_paths = [os.path.join(dir_path, x) for x in file_paths]
165 # filter to get only those that are a files, with a leading
166 # digit and end in '.conf'
167 file_paths = [x for x in file_paths if os.path.isfile(x) and
168 regex.search(os.path.basename(x))]
170 # sort ascending on the leading digits and afla (e.g. 03_, 05a_)
171 file_paths.sort(key=get_prefix)
173 # load settings from each file in turn
174 for filepath in file_paths:
175 self.load_from_file(filepath)
177 def load_from_dict(self, conf):
179 Update ``settings`` with values found in ``conf``.
181 Unlike the other loaders, this is case insensitive.
184 if conf[key] is not None:
185 if isinstance(conf[key], dict):
186 # recursively update dict items, e.g. TEST_PARAMS
187 setattr(self, key.upper(),
188 merge_spec(getattr(self, key.upper()), conf[key]))
190 setattr(self, key.upper(), conf[key])
192 def load_from_env(self):
194 Update ``settings`` with values found in the environment.
196 for key in os.environ:
197 setattr(self, key, os.environ[key])
199 def check_test_params(self):
201 Check all parameters defined inside TEST_PARAMS for their
202 existence. In case that non existing vsperf parmeter name
203 is detected, then VSPER will raise a runtime error.
206 for key in settings.getValue('TEST_PARAMS'):
207 if key == 'TEST_PARAMS':
208 raise RuntimeError('It is not allowed to define TEST_PARAMS '
209 'as a test parameter')
210 if key not in self.__dict__ and key not in _EXTRA_TEST_PARAMS:
211 unknown_keys.append(key)
213 if len(unknown_keys):
214 raise RuntimeError('Test parameters contain unknown configuration '
215 'parameter(s): {}'.format(', '.join(unknown_keys)))
217 def check_vm_settings(self, vm_number):
219 Check all VM related settings starting with GUEST_ prefix.
220 If it is not available for defined number of VMs, then vsperf
221 will try to expand it automatically. Expansion is performed
222 also in case that first list item contains a macro.
224 for key in self.__dict__:
225 if key.startswith('GUEST_'):
226 value = self.getValue(key)
227 if isinstance(value, str) and str(value).find('#') >= 0:
228 self._expand_vm_settings(key, 1)
230 if isinstance(value, list):
231 if len(value) < vm_number or str(value[0]).find('#') >= 0:
232 # expand configuration for all VMs
233 self._expand_vm_settings(key, vm_number)
235 def _expand_vm_settings(self, key, vm_number):
237 Expand VM option with given key for given number of VMs
239 tmp_value = self.getValue(key)
240 if isinstance(tmp_value, str):
242 master_value = tmp_value
243 tmp_value = [tmp_value]
246 master_value = tmp_value[0]
248 master_value_str = str(master_value)
249 if master_value_str.find('#') >= 0:
250 self.__dict__[key] = []
251 for vmindex in range(vm_number):
252 value = master_value_str.replace('#VMINDEX', str(vmindex))
253 for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
254 multi = int(step) if len(step) and int(step) else 1
256 # pylint: disable=eval-used
257 tmp_result = str(eval(param))
258 elif macro == '#MAC':
259 mac_value = netaddr.EUI(param).value
260 mac = netaddr.EUI(mac_value + vmindex * multi)
261 mac.dialect = netaddr.mac_unix_expanded
262 tmp_result = str(mac)
264 ip_value = netaddr.IPAddress(param).value
265 tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
267 raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))
269 value = value.replace("{}{}".format(macro, args), tmp_result)
271 # retype value to original type if needed
272 if not isinstance(master_value, str):
273 value = ast.literal_eval(value)
274 self.__dict__[key].append(value)
276 for vmindex in range(len(tmp_value), vm_number):
277 self.__dict__[key].append(master_value)
280 self.__dict__[key] = self.__dict__[key][0]
282 _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
285 """Provide settings as a human-readable string.
287 This can be useful for debug.
290 A human-readable string.
293 for key in self.__dict__:
294 tmp_dict[key] = self.getValue(key)
296 return pprint.pformat(tmp_dict)
299 # validation methods used by step driven testcases
301 def validate_getValue(self, result, attr):
302 """Verifies, that correct value was returned
304 # getValue must be called to expand macros and apply
305 # values from TEST_PARAM option
306 assert result == self.getValue(attr)
309 def validate_setValue(self, dummy_result, name, value):
310 """Verifies, that value was correctly set
312 assert value == self.__dict__[name]
315 settings = Settings()
317 def get_test_param(key, default=None):
318 """Retrieve value for test param ``key`` if available.
320 :param key: Key to retrieve from test params.
321 :param default: Default to return if key not found.
323 :returns: Value for ``key`` if found, else ``default``.
325 test_params = settings.getValue('TEST_PARAMS')
326 return test_params.get(key, default) if test_params else default
328 def merge_spec(orig, new):
329 """Merges ``new`` dict with ``orig`` dict, and returns orig.
331 This takes into account nested dictionaries. Example:
333 >>> old = {'foo': 1, 'bar': {'foo': 2, 'bar': 3}}
334 >>> new = {'foo': 6, 'bar': {'foo': 7}}
335 >>> merge_spec(old, new)
336 {'foo': 6, 'bar': {'foo': 7, 'bar': 3}}
338 You'll notice that ``bar.bar`` is not removed. This is the desired result.
344 # Not allowing derived dictionary types for now
345 # pylint: disable=unidiomatic-typecheck
346 if type(orig[key]) == dict:
347 orig[key] = merge_spec(orig[key], new[key])