+
+
+def mac_address_to_hex_list(mac):
+ octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
+ assert len(octets) == 6 and all(len(octet) == 4 for octet in octets)
+ return octets
+
+
+def safe_ip_address(ip_addr):
+ """ get ip address version v6 or v4 """
+ try:
+ return ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return None
+
+
+def get_ip_version(ip_addr):
+ """ get ip address version v6 or v4 """
+ try:
+ address = ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return None
+ else:
+ return address.version
+
+
+def ip_to_hex(ip_addr, separator=''):
+ try:
+ address = ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return ip_addr
+
+ if address.version != 4:
+ return ip_addr
+
+ if not separator:
+ return '{:08x}'.format(int(address))
+
+ return separator.join('{:02x}'.format(octet) for octet in address.packed)
+
+
+def try_int(s, *args):
+ """Convert to integer if possible."""
+ try:
+ return int(s)
+ except (TypeError, ValueError):
+ return args[0] if args else s
+
+
+class SocketTopology(dict):
+
+ @classmethod
+ def parse_cpuinfo(cls, cpuinfo):
+ socket_map = {}
+
+ lines = cpuinfo.splitlines()
+
+ core_details = []
+ core_lines = {}
+ for line in lines:
+ if line.strip():
+ name, value = line.split(":", 1)
+ core_lines[name.strip()] = try_int(value.strip())
+ else:
+ core_details.append(core_lines)
+ core_lines = {}
+
+ for core in core_details:
+ socket_map.setdefault(core["physical id"], {}).setdefault(
+ core["core id"], {})[core["processor"]] = (
+ core["processor"], core["core id"], core["physical id"])
+
+ return cls(socket_map)
+
+ def sockets(self):
+ return sorted(self.keys())
+
+ def cores(self):
+ return sorted(core for cores in self.values() for core in cores)
+
+ def processors(self):
+ return sorted(
+ proc for cores in self.values() for procs in cores.values() for
+ proc in procs)
+
+
+def config_to_dict(config):
+ return {section: dict(config.items(section)) for section in
+ config.sections()}
+
+
+def validate_non_string_sequence(value, default=None, raise_exc=None):
+ if isinstance(value, collections.Sequence) and not isinstance(value, str):
+ return value
+ if raise_exc:
+ raise raise_exc
+ return default
+
+
+def join_non_strings(separator, *non_strings):
+ try:
+ non_strings = validate_non_string_sequence(non_strings[0], raise_exc=RuntimeError)
+ except (IndexError, RuntimeError):
+ pass
+ return str(separator).join(str(non_string) for non_string in non_strings)
+
+
+class ErrorClass(object):
+
+ def __init__(self, *args, **kwargs):
+ if 'test' not in kwargs:
+ raise RuntimeError
+
+ def __getattr__(self, item):
+ raise AttributeError
+
+
+class Timer(object):
+ def __init__(self):
+ super(Timer, self).__init__()
+ self.start = self.delta = None
+
+ def __enter__(self):
+ self.start = datetime.datetime.now()
+ return self
+
+ def __exit__(self, *_):
+ self.delta = datetime.datetime.now() - self.start
+
+ def __getattr__(self, item):
+ return getattr(self.delta, item)