Prox L2FWD multiflow test fix
[yardstick.git] / yardstick / common / utils.py
1 # Copyright 2013: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 import collections
17 from contextlib import closing
18 import datetime
19 import errno
20 import importlib
21 import ipaddress
22 import logging
23 import os
24 import pydoc
25 import random
26 import re
27 import signal
28 import socket
29 import subprocess
30 import sys
31 import time
32 import threading
33
34 import six
35 from flask import jsonify
36 from six.moves import configparser
37 from oslo_serialization import jsonutils
38 from oslo_utils import encodeutils
39
40 import yardstick
41 from yardstick.common import exceptions
42
43
44 logger = logging.getLogger(__name__)
45 logger.setLevel(logging.DEBUG)
46
47
48 # Decorator for cli-args
49 def cliargs(*args, **kwargs):
50     def _decorator(func):
51         func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
52         return func
53     return _decorator
54
55
56 def itersubclasses(cls, _seen=None):
57     """Generator over all subclasses of a given class in depth first order."""
58
59     if not isinstance(cls, type):
60         raise TypeError("itersubclasses must be called with "
61                         "new-style classes, not %.100r" % cls)
62     _seen = _seen or set()
63     try:
64         subs = cls.__subclasses__()
65     except TypeError:   # fails only when cls is type
66         subs = cls.__subclasses__(cls)
67     for sub in subs:
68         if sub not in _seen:
69             _seen.add(sub)
70             yield sub
71             for sub in itersubclasses(sub, _seen):
72                 yield sub
73
74
75 def import_modules_from_package(package, raise_exception=False):
76     """Import modules given a package name
77
78     :param: package - Full package name. For example: rally.deploy.engines
79     """
80     yardstick_root = os.path.dirname(os.path.dirname(yardstick.__file__))
81     path = os.path.join(yardstick_root, *package.split('.'))
82     for root, _, files in os.walk(path):
83         matches = (filename for filename in files if filename.endswith('.py')
84                    and not filename.startswith('__'))
85         new_package = os.path.relpath(root, yardstick_root).replace(os.sep,
86                                                                     '.')
87         module_names = set(
88             '{}.{}'.format(new_package, filename.rsplit('.py', 1)[0])
89             for filename in matches)
90         # Find modules which haven't already been imported
91         missing_modules = module_names.difference(sys.modules)
92         logger.debug('Importing modules: %s', missing_modules)
93         for module_name in missing_modules:
94             try:
95                 importlib.import_module(module_name)
96             except (ImportError, SyntaxError) as exc:
97                 if raise_exception:
98                     raise exc
99                 logger.exception('Unable to import module %s', module_name)
100
101
102 NON_NONE_DEFAULT = object()
103
104
105 def get_key_with_default(data, key, default=NON_NONE_DEFAULT):
106     value = data.get(key, default)
107     if value is NON_NONE_DEFAULT:
108         raise KeyError(key)
109     return value
110
111
112 def make_dict_from_map(data, key_map):
113     return {dest_key: get_key_with_default(data, src_key, default)
114             for dest_key, (src_key, default) in key_map.items()}
115
116
117 def makedirs(d):
118     try:
119         os.makedirs(d)
120     except OSError as e:
121         if e.errno != errno.EEXIST:
122             raise
123
124
125 def remove_file(path):
126     try:
127         os.remove(path)
128     except OSError as e:
129         if e.errno != errno.ENOENT:
130             raise
131
132
133 def execute_command(cmd, **kwargs):
134     exec_msg = "Executing command: '%s'" % cmd
135     logger.debug(exec_msg)
136
137     output = subprocess.check_output(cmd.split(), **kwargs)
138     return encodeutils.safe_decode(output, incoming='utf-8').split(os.linesep)
139
140
141 def source_env(env_file):
142     p = subprocess.Popen(". %s; env" % env_file, stdout=subprocess.PIPE,
143                          shell=True)
144     output = p.communicate()[0]
145
146     # sometimes output type would be binary_type, and it don't have splitlines
147     # method, so we need to decode
148     if isinstance(output, six.binary_type):
149         output = encodeutils.safe_decode(output)
150     env = dict(line.split('=', 1) for line in output.splitlines() if '=' in line)
151     os.environ.update(env)
152     return env
153
154
155 def read_json_from_file(path):
156     with open(path, 'r') as f:
157         j = f.read()
158     # don't use jsonutils.load() it conflicts with already decoded input
159     return jsonutils.loads(j)
160
161
162 def write_json_to_file(path, data, mode='w'):
163     with open(path, mode) as f:
164         jsonutils.dump(data, f)
165
166
167 def write_file(path, data, mode='w'):
168     with open(path, mode) as f:
169         f.write(data)
170
171
172 def parse_ini_file(path):
173     parser = configparser.ConfigParser()
174
175     try:
176         files = parser.read(path)
177     except configparser.MissingSectionHeaderError:
178         logger.exception('invalid file type')
179         raise
180     else:
181         if not files:
182             raise RuntimeError('file not exist')
183
184     try:
185         default = {k: v for k, v in parser.items('DEFAULT')}
186     except configparser.NoSectionError:
187         default = {}
188
189     config = dict(DEFAULT=default,
190                   **{s: {k: v for k, v in parser.items(
191                       s)} for s in parser.sections()})
192
193     return config
194
195
196 def get_port_mac(sshclient, port):
197     cmd = "ifconfig |grep HWaddr |grep %s |awk '{print $5}' " % port
198     _, stdout, _ = sshclient.execute(cmd, raise_on_error=True)
199
200     return stdout.rstrip()
201
202
203 def get_port_ip(sshclient, port):
204     cmd = "ifconfig %s |grep 'inet addr' |awk '{print $2}' " \
205         "|cut -d ':' -f2 " % port
206     _, stdout, _ = sshclient.execute(cmd, raise_on_error=True)
207
208     return stdout.rstrip()
209
210
211 def flatten_dict_key(data):
212     next_data = {}
213
214     # use list, because iterable is too generic
215     if not any(isinstance(v, (collections.Mapping, list))
216                for v in data.values()):
217         return data
218
219     for k, v in data.items():
220         if isinstance(v, collections.Mapping):
221             for n_k, n_v in v.items():
222                 next_data["%s.%s" % (k, n_k)] = n_v
223         # use list because iterable is too generic
224         elif isinstance(v, collections.Iterable) and not isinstance(v, six.string_types):
225             for index, item in enumerate(v):
226                 next_data["%s%d" % (k, index)] = item
227         else:
228             next_data[k] = v
229
230     return flatten_dict_key(next_data)
231
232
233 def translate_to_str(obj):
234     if isinstance(obj, collections.Mapping):
235         return {str(k): translate_to_str(v) for k, v in obj.items()}
236     elif isinstance(obj, list):
237         return [translate_to_str(ele) for ele in obj]
238     elif isinstance(obj, six.text_type):
239         return str(obj)
240     return obj
241
242
243 def result_handler(status, data):
244     result = {
245         'status': status,
246         'result': data
247     }
248     return jsonify(result)
249
250
251 def change_obj_to_dict(obj):
252     dic = {}
253     for k, v in vars(obj).items():
254         try:
255             vars(v)
256         except TypeError:
257             dic.update({k: v})
258     return dic
259
260
261 def set_dict_value(dic, keys, value):
262     return_dic = dic
263
264     for key in keys.split('.'):
265         return_dic.setdefault(key, {})
266         if key == keys.split('.')[-1]:
267             return_dic[key] = value
268         else:
269             return_dic = return_dic[key]
270     return dic
271
272
273 def get_free_port(ip):
274     with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
275         port = random.randint(5000, 10000)
276         while s.connect_ex((ip, port)) == 0:
277             port = random.randint(5000, 10000)
278         return port
279
280
281 def mac_address_to_hex_list(mac):
282     try:
283         octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
284     except ValueError:
285         raise exceptions.InvalidMacAddress(mac_address=mac)
286     if len(octets) != 6 or all(len(octet) != 4 for octet in octets):
287         raise exceptions.InvalidMacAddress(mac_address=mac)
288     return octets
289
290
291 def make_ipv4_address(ip_addr):
292     return ipaddress.IPv4Address(six.text_type(ip_addr))
293
294
295 def safe_ip_address(ip_addr):
296     """ get ip address version v6 or v4 """
297     try:
298         return ipaddress.ip_address(six.text_type(ip_addr))
299     except ValueError:
300         logging.error("%s is not valid", ip_addr)
301         return None
302
303
304 def get_ip_version(ip_addr):
305     """ get ip address version v6 or v4 """
306     try:
307         address = ipaddress.ip_address(six.text_type(ip_addr))
308     except ValueError:
309         logging.error("%s is not valid", ip_addr)
310         return None
311     else:
312         return address.version
313
314
315 def make_ip_addr(ip, mask):
316     """
317     :param ip[str]: ip adddress
318     :param mask[str]: /24 prefix of 255.255.255.0 netmask
319     :return: IPv4Interface object
320     """
321     try:
322         return ipaddress.ip_interface(six.text_type('/'.join([ip, mask])))
323     except (TypeError, ValueError):
324         # None so we can skip later
325         return None
326
327
328 def ip_to_hex(ip_addr, separator=''):
329     try:
330         address = ipaddress.ip_address(six.text_type(ip_addr))
331     except ValueError:
332         logging.error("%s is not valid", ip_addr)
333         return ip_addr
334
335     if address.version != 4:
336         return ip_addr
337
338     if not separator:
339         return '{:08x}'.format(int(address))
340
341     return separator.join('{:02x}'.format(octet) for octet in address.packed)
342
343
344 def get_mask_from_ip_range(ip_low, ip_high):
345     _ip_low = ipaddress.ip_address(ip_low)
346     _ip_high = ipaddress.ip_address(ip_high)
347     _ip_low_int = int(_ip_low)
348     _ip_high_int = int(_ip_high)
349     return _ip_high.max_prefixlen - (_ip_high_int ^ _ip_low_int).bit_length()
350
351
352 def try_int(s, *args):
353     """Convert to integer if possible."""
354     try:
355         return int(s)
356     except (TypeError, ValueError):
357         return args[0] if args else s
358
359
360 class SocketTopology(dict):
361
362     @classmethod
363     def parse_cpuinfo(cls, cpuinfo):
364         socket_map = {}
365
366         lines = cpuinfo.splitlines()
367
368         core_details = []
369         core_lines = {}
370         for line in lines:
371             if line.strip():
372                 name, value = line.split(":", 1)
373                 core_lines[name.strip()] = try_int(value.strip())
374             else:
375                 core_details.append(core_lines)
376                 core_lines = {}
377
378         for core in core_details:
379             socket_map.setdefault(core["physical id"], {}).setdefault(
380                 core["core id"], {})[core["processor"]] = (
381                 core["processor"], core["core id"], core["physical id"])
382
383         return cls(socket_map)
384
385     def sockets(self):
386         return sorted(self.keys())
387
388     def cores(self):
389         return sorted(core for cores in self.values() for core in cores)
390
391     def processors(self):
392         return sorted(
393             proc for cores in self.values() for procs in cores.values() for
394             proc in procs)
395
396
397 def config_to_dict(config):
398     return {section: dict(config.items(section)) for section in
399             config.sections()}
400
401
402 def validate_non_string_sequence(value, default=None, raise_exc=None):
403     # NOTE(ralonsoh): refactor this function to check if raise_exc is an
404     # Exception. Remove duplicate code, this function is duplicated in this
405     # repository.
406     if isinstance(value, collections.Sequence) and not isinstance(value, six.string_types):
407         return value
408     if raise_exc:
409         raise raise_exc  # pylint: disable=raising-bad-type
410     return default
411
412
413 def join_non_strings(separator, *non_strings):
414     try:
415         non_strings = validate_non_string_sequence(non_strings[0], raise_exc=RuntimeError)
416     except (IndexError, RuntimeError):
417         pass
418     return str(separator).join(str(non_string) for non_string in non_strings)
419
420
421 def safe_decode_utf8(s):
422     """Safe decode a str from UTF"""
423     if six.PY3 and isinstance(s, bytes):
424         return s.decode('utf-8', 'surrogateescape')
425     return s
426
427
428 class ErrorClass(object):
429
430     def __init__(self, *args, **kwargs):
431         if 'test' not in kwargs:
432             raise RuntimeError
433
434     def __getattr__(self, item):
435         raise AttributeError
436
437
438 class Timer(object):
439     def __init__(self, timeout=None, raise_exception=True):
440         super(Timer, self).__init__()
441         self.start = self.delta = None
442         self._timeout = int(timeout) if timeout else None
443         self._timeout_flag = False
444         self._raise_exception = raise_exception
445
446     def _timeout_handler(self, *args):
447         self._timeout_flag = True
448         if self._raise_exception:
449             raise exceptions.TimerTimeout(timeout=self._timeout)
450         self.__exit__()
451
452     def __enter__(self):
453         self.start = datetime.datetime.now()
454         if self._timeout:
455             signal.signal(signal.SIGALRM, self._timeout_handler)
456             signal.alarm(self._timeout)
457         return self
458
459     def __exit__(self, *_):
460         if self._timeout:
461             signal.alarm(0)
462         self.delta = datetime.datetime.now() - self.start
463
464     def __getattr__(self, item):
465         return getattr(self.delta, item)
466
467     def __iter__(self):
468         self._raise_exception = False
469         return self.__enter__()
470
471     def next(self):  # pragma: no cover
472         # NOTE(ralonsoh): Python 2 support.
473         if not self._timeout_flag:
474             return datetime.datetime.now()
475         raise StopIteration()
476
477     def __next__(self):  # pragma: no cover
478         # NOTE(ralonsoh): Python 3 support.
479         return self.next()
480
481     def __del__(self):  # pragma: no cover
482         signal.alarm(0)
483
484     def delta_time_sec(self):
485         return (datetime.datetime.now() - self.start).total_seconds()
486
487
488 def read_meminfo(ssh_client):
489     """Read "/proc/meminfo" file and parse all keys and values"""
490
491     cpuinfo = six.BytesIO()
492     ssh_client.get_file_obj('/proc/meminfo', cpuinfo)
493     lines = cpuinfo.getvalue().decode('utf-8')
494     matches = re.findall(r"([\w\(\)]+):\s+(\d+)( kB)*", lines)
495     output = {}
496     for match in matches:
497         output[match[0]] = match[1]
498
499     return output
500
501
502 def setup_hugepages(ssh_client, size_kb):
503     """Setup needed number of hugepages for the size specified"""
504
505     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
506     meminfo = read_meminfo(ssh_client)
507     hp_size_kb = int(meminfo['Hugepagesize'])
508     hp_number = int(abs(size_kb / hp_size_kb))
509     ssh_client.execute(
510         'echo %s | sudo tee %s' % (hp_number, NR_HUGEPAGES_PATH))
511     hp = six.BytesIO()
512     ssh_client.get_file_obj(NR_HUGEPAGES_PATH, hp)
513     hp_number_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
514     logger.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
515                 hp_size_kb, hp_number, hp_number_set)
516     return hp_size_kb, hp_number, hp_number_set
517
518
519 def find_relative_file(path, task_path):
520     """
521     Find file in one of places: in abs of path or relative to a directory path,
522     in this order.
523
524     :param path:
525     :param task_path:
526     :return str: full path to file
527     """
528     # fixme: create schema to validate all fields have been provided
529     for lookup in [os.path.abspath(path), os.path.join(task_path, path)]:
530         try:
531             with open(lookup):
532                 return lookup
533         except IOError:
534             pass
535     raise IOError(errno.ENOENT, 'Unable to find {} file'.format(path))
536
537
538 def open_relative_file(path, task_path):
539     try:
540         return open(path)
541     except IOError as e:
542         if e.errno == errno.ENOENT:
543             return open(os.path.join(task_path, path))
544         raise
545
546
547 def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
548     """Wait until callable predicate is evaluated as True
549
550     When in a thread different from the main one, Timer(timeout) will fail
551     because signal is not handled. In this case
552
553     :param predicate: (func) callable deciding whether waiting should continue
554     :param timeout: (int) timeout in seconds how long should function wait
555     :param sleep: (int) polling interval for results in seconds
556     :param exception: exception instance to raise on timeout. If None is passed
557                       (default) then WaitTimeout exception is raised.
558     """
559     if isinstance(threading.current_thread(), threading._MainThread):
560         try:
561             with Timer(timeout=timeout):
562                 while not predicate():
563                     time.sleep(sleep)
564         except exceptions.TimerTimeout:
565             if exception and issubclass(exception, Exception):
566                 raise exception  # pylint: disable=raising-bad-type
567             raise exceptions.WaitTimeout
568     else:
569         with Timer() as timer:
570             while timer.delta_time_sec() < timeout:
571                 if predicate():
572                     return
573                 time.sleep(sleep)
574         if exception and issubclass(exception, Exception):
575             raise exception  # pylint: disable=raising-bad-type
576         raise exceptions.WaitTimeout
577
578
579 def send_socket_command(host, port, command):
580     """Send a string command to a specific port in a host
581
582     :param host: (str) ip or hostname of the host
583     :param port: (int) port number
584     :param command: (str) command to send
585     :return: 0 if success, error number if error
586     """
587     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
588     ret = 0
589     try:
590         err_number = sock.connect_ex((host, int(port)))
591         if err_number != 0:
592             return err_number
593         sock.sendall(six.b(command))
594     except Exception:  # pylint: disable=broad-except
595         ret = 1
596     finally:
597         sock.close()
598     return ret
599
600
601 def safe_cast(value, type_to_convert, default_value):
602     """Convert value to type, in case of error return default_value
603
604     :param value: value to convert
605     :param type_to_convert: type to convert, could be "type" or "string"
606     :param default_value: default value to return
607     :return: converted value or default_value
608     """
609     if isinstance(type_to_convert, type):
610         _type = type_to_convert
611     else:
612         _type = pydoc.locate(type_to_convert)
613         if not _type:
614             raise exceptions.InvalidType(type_to_convert=type_to_convert)
615
616     try:
617         return _type(value)
618     except ValueError:
619         return default_value