import subprocess
import sys
import time
+import threading
import six
from flask import jsonify
def mac_address_to_hex_list(mac):
- octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
- assert len(octets) == 6 and all(len(octet) == 4 for octet in octets)
+ try:
+ octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
+ except ValueError:
+ raise exceptions.InvalidMacAddress(mac_address=mac)
+ if len(octets) != 6 or all(len(octet) != 4 for octet in octets):
+ raise exceptions.InvalidMacAddress(mac_address=mac)
return octets
return address.version
+def make_ip_addr(ip, mask):
+ """
+ :param ip[str]: ip adddress
+ :param mask[str]: /24 prefix of 255.255.255.0 netmask
+ :return: IPv4Interface object
+ """
+ try:
+ return ipaddress.ip_interface(six.text_type('/'.join([ip, mask])))
+ except (TypeError, ValueError):
+ # None so we can skip later
+ return None
+
+
def ip_to_hex(ip_addr, separator=''):
try:
address = ipaddress.ip_address(six.text_type(ip_addr))
return separator.join('{:02x}'.format(octet) for octet in address.packed)
+def get_mask_from_ip_range(ip_low, ip_high):
+ _ip_low = ipaddress.ip_address(ip_low)
+ _ip_high = ipaddress.ip_address(ip_high)
+ _ip_low_int = int(_ip_low)
+ _ip_high_int = int(_ip_high)
+ return _ip_high.max_prefixlen - (_ip_high_int ^ _ip_low_int).bit_length()
+
+
def try_int(s, *args):
"""Convert to integer if possible."""
try:
class Timer(object):
- def __init__(self, timeout=None):
+ def __init__(self, timeout=None, raise_exception=True):
super(Timer, self).__init__()
self.start = self.delta = None
self._timeout = int(timeout) if timeout else None
+ self._timeout_flag = False
+ self._raise_exception = raise_exception
def _timeout_handler(self, *args):
- raise exceptions.TimerTimeout(timeout=self._timeout)
+ self._timeout_flag = True
+ if self._raise_exception:
+ raise exceptions.TimerTimeout(timeout=self._timeout)
+ self.__exit__()
def __enter__(self):
self.start = datetime.datetime.now()
def __getattr__(self, item):
return getattr(self.delta, item)
+ def __iter__(self):
+ self._raise_exception = False
+ return self.__enter__()
+
+ def next(self): # pragma: no cover
+ # NOTE(ralonsoh): Python 2 support.
+ if not self._timeout_flag:
+ return datetime.datetime.now()
+ raise StopIteration()
+
+ def __next__(self): # pragma: no cover
+ # NOTE(ralonsoh): Python 3 support.
+ return self.next()
+
+ def __del__(self): # pragma: no cover
+ signal.alarm(0)
+
+ def delta_time_sec(self):
+ return (datetime.datetime.now() - self.start).total_seconds()
+
def read_meminfo(ssh_client):
"""Read "/proc/meminfo" file and parse all keys and values"""
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""Wait until callable predicate is evaluated as True
+ When in a thread different from the main one, Timer(timeout) will fail
+ because signal is not handled. In this case
+
:param predicate: (func) callable deciding whether waiting should continue
:param timeout: (int) timeout in seconds how long should function wait
:param sleep: (int) polling interval for results in seconds
:param exception: exception instance to raise on timeout. If None is passed
(default) then WaitTimeout exception is raised.
"""
- try:
- with Timer(timeout=timeout):
- while not predicate():
+ if isinstance(threading.current_thread(), threading._MainThread):
+ try:
+ with Timer(timeout=timeout):
+ while not predicate():
+ time.sleep(sleep)
+ except exceptions.TimerTimeout:
+ if exception and issubclass(exception, Exception):
+ raise exception # pylint: disable=raising-bad-type
+ raise exceptions.WaitTimeout
+ else:
+ with Timer() as timer:
+ while timer.delta_time_sec() < timeout:
+ if predicate():
+ return
time.sleep(sleep)
- except exceptions.TimerTimeout:
if exception and issubclass(exception, Exception):
raise exception # pylint: disable=raising-bad-type
raise exceptions.WaitTimeout
+
+
+def send_socket_command(host, port, command):
+ """Send a string command to a specific port in a host
+
+ :param host: (str) ip or hostname of the host
+ :param port: (int) port number
+ :param command: (str) command to send
+ :return: 0 if success, error number if error
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ ret = 0
+ try:
+ err_number = sock.connect_ex((host, int(port)))
+ if err_number != 0:
+ return err_number
+ sock.sendall(six.b(command))
+ except Exception: # pylint: disable=broad-except
+ ret = 1
+ finally:
+ sock.close()
+ return ret