1 # Copyright 2015-2016 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Settings and configuration handlers.
17 Settings will be loaded from several .conf files
18 and any user provided settings file.
21 # pylint: disable=invalid-name
31 _LOGGER = logging.getLogger(__name__)
33 # Special test parameters which are not part of standard VSPERF configuration
34 _EXTRA_TEST_PARAMS = ['TUNNEL_TYPE']
36 # regex to parse configuration macros from 04_vnf.conf
37 # it will select all patterns starting with # sign
38 # and returns macro parameters and step
39 # examples of valid macros:
41 # #MAC(AA:BB:CC:DD:EE:FF) or #MAC(AA:BB:CC:DD:EE:FF,2)
42 # #IP(192.168.1.2) or #IP(192.168.1.2,2)
44 _PARSE_PATTERN = r'(#[A-Z]+)(\(([^(),]+)(,([0-9]+))?\))?'
46 class Settings(object):
47 """Holding class for settings.
52 def getValue(self, attr):
53 """Return a settings item value
55 if attr in self.__dict__:
56 if attr == 'TEST_PARAMS':
57 return getattr(self, attr)
59 master_value = getattr(self, attr)
60 # Check if parameter value was modified by CLI option
61 cli_value = get_test_param(attr, None)
63 # TRAFFIC dictionary is not overridden by CLI option
64 # but only updated by specified values
66 tmp_value = copy.deepcopy(master_value)
67 tmp_value = merge_spec(tmp_value, cli_value)
74 raise AttributeError("%r object has no attribute %r" %
75 (self.__class__, attr))
77 def __setattr__(self, name, value):
80 # skip non-settings. this should exclude built-ins amongst others
81 if not name.isupper():
84 # we can assume all uppercase keys are valid settings
85 super(Settings, self).__setattr__(name, value)
87 def setValue(self, name, value):
90 if name is not None and value is not None:
91 super(Settings, self).__setattr__(name, value)
93 def load_from_file(self, path):
94 """Update ``settings`` with values found in module at ``path``.
98 custom_settings = imp.load_source('custom_settings', path)
100 for key in dir(custom_settings):
101 if getattr(custom_settings, key) is not None:
102 setattr(self, key, getattr(custom_settings, key))
104 def load_from_dir(self, dir_path):
105 """Update ``settings`` with contents of the .conf files at ``path``.
107 Each file must be named Nfilename.conf, where N is a single or
108 multi-digit decimal number. The files are loaded in ascending order of
109 N - so if a configuration item exists in more that one file the setting
110 in the file with the largest value of N takes precedence.
112 :param dir_path: The full path to the dir from which to load the .conf
117 regex = re.compile("^(?P<digit_part>[0-9]+).*.conf$")
119 def get_prefix(filename):
121 Provide a suitable function for sort's key arg
123 match_object = regex.search(os.path.basename(filename))
124 return int(match_object.group('digit_part'))
126 # get full file path to all files & dirs in dir_path
127 file_paths = os.listdir(dir_path)
128 file_paths = [os.path.join(dir_path, x) for x in file_paths]
130 # filter to get only those that are a files, with a leading
131 # digit and end in '.conf'
132 file_paths = [x for x in file_paths if os.path.isfile(x) and
133 regex.search(os.path.basename(x))]
135 # sort ascending on the leading digits
136 file_paths.sort(key=get_prefix)
138 # load settings from each file in turn
139 for filepath in file_paths:
140 self.load_from_file(filepath)
142 def load_from_dict(self, conf):
144 Update ``settings`` with values found in ``conf``.
146 Unlike the other loaders, this is case insensitive.
149 if conf[key] is not None:
150 if isinstance(conf[key], dict):
151 # recursively update dict items, e.g. TEST_PARAMS
152 setattr(self, key.upper(),
153 merge_spec(getattr(self, key.upper()), conf[key]))
155 setattr(self, key.upper(), conf[key])
157 def load_from_env(self):
159 Update ``settings`` with values found in the environment.
161 for key in os.environ:
162 setattr(self, key, os.environ[key])
164 def check_test_params(self):
166 Check all parameters defined inside TEST_PARAMS for their
167 existence. In case that non existing vsperf parmeter name
168 is detected, then VSPER will raise a runtime error.
171 for key in settings.getValue('TEST_PARAMS'):
172 if key == 'TEST_PARAMS':
173 raise RuntimeError('It is not allowed to define TEST_PARAMS '
174 'as a test parameter')
175 if key not in self.__dict__ and key not in _EXTRA_TEST_PARAMS:
176 unknown_keys.append(key)
178 if len(unknown_keys):
179 raise RuntimeError('Test parameters contain unknown configuration '
180 'parameter(s): {}'.format(', '.join(unknown_keys)))
182 def check_vm_settings(self, vm_number):
184 Check all VM related settings starting with GUEST_ prefix.
185 If it is not available for defined number of VMs, then vsperf
186 will try to expand it automatically. Expansion is performed
187 also in case that first list item contains a macro.
189 for key in self.__dict__:
190 if key.startswith('GUEST_'):
191 value = self.getValue(key)
192 if isinstance(value, str) and value.find('#') >= 0:
193 self._expand_vm_settings(key, 1)
195 if isinstance(value, list):
196 if len(value) < vm_number or str(value[0]).find('#') >= 0:
197 # expand configuration for all VMs
198 self._expand_vm_settings(key, vm_number)
200 def _expand_vm_settings(self, key, vm_number):
202 Expand VM option with given key for given number of VMs
204 tmp_value = self.getValue(key)
205 if isinstance(tmp_value, str):
207 master_value = tmp_value
208 tmp_value = [tmp_value]
211 master_value = tmp_value[0]
213 master_value_str = str(master_value)
214 if master_value_str.find('#') >= 0:
215 self.__dict__[key] = []
216 for vmindex in range(vm_number):
217 value = master_value_str.replace('#VMINDEX', str(vmindex))
218 for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
219 multi = int(step) if len(step) and int(step) else 1
221 # pylint: disable=eval-used
222 tmp_result = str(eval(param))
223 elif macro == '#MAC':
224 mac_value = netaddr.EUI(param).value
225 mac = netaddr.EUI(mac_value + vmindex * multi)
226 mac.dialect = netaddr.mac_unix_expanded
227 tmp_result = str(mac)
229 ip_value = netaddr.IPAddress(param).value
230 tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
232 raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))
234 value = value.replace("{}{}".format(macro, args), tmp_result)
236 # retype value to original type if needed
237 if not isinstance(master_value, str):
238 value = ast.literal_eval(value)
239 self.__dict__[key].append(value)
241 for vmindex in range(len(tmp_value), vm_number):
242 self.__dict__[key].append(master_value)
245 self.__dict__[key] = self.__dict__[key][0]
247 _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
250 """Provide settings as a human-readable string.
252 This can be useful for debug.
255 A human-readable string.
258 for key in self.__dict__:
259 tmp_dict[key] = self.getValue(key)
261 return pprint.pformat(tmp_dict)
264 # validation methods used by step driven testcases
266 def validate_getValue(self, result, attr):
267 """Verifies, that correct value was returned
269 assert result == self.__dict__[attr]
272 def validate_setValue(self, dummy_result, name, value):
273 """Verifies, that value was correctly set
275 assert value == self.__dict__[name]
278 settings = Settings()
280 def get_test_param(key, default=None):
281 """Retrieve value for test param ``key`` if available.
283 :param key: Key to retrieve from test params.
284 :param default: Default to return if key not found.
286 :returns: Value for ``key`` if found, else ``default``.
288 test_params = settings.getValue('TEST_PARAMS')
289 return test_params.get(key, default) if test_params else default
291 def merge_spec(orig, new):
292 """Merges ``new`` dict with ``orig`` dict, and returns orig.
294 This takes into account nested dictionaries. Example:
296 >>> old = {'foo': 1, 'bar': {'foo': 2, 'bar': 3}}
297 >>> new = {'foo': 6, 'bar': {'foo': 7}}
298 >>> merge_spec(old, new)
299 {'foo': 6, 'bar': {'foo': 7, 'bar': 3}}
301 You'll notice that ``bar.bar`` is not removed. This is the desired result.
307 # Not allowing derived dictionary types for now
308 # pylint: disable=unidiomatic-typecheck
309 if type(orig[key]) == dict:
310 orig[key] = merge_spec(orig[key], new[key])