import subprocess
import sys
import time
+import threading
import six
from flask import jsonify
def mac_address_to_hex_list(mac):
- octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
- assert len(octets) == 6 and all(len(octet) == 4 for octet in octets)
+ try:
+ octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
+ except ValueError:
+ raise exceptions.InvalidMacAddress(mac_address=mac)
+ if len(octets) != 6 or all(len(octet) != 4 for octet in octets):
+ raise exceptions.InvalidMacAddress(mac_address=mac)
return octets
return separator.join('{:02x}'.format(octet) for octet in address.packed)
+def get_mask_from_ip_range(ip_low, ip_high):
+ _ip_low = ipaddress.ip_address(ip_low)
+ _ip_high = ipaddress.ip_address(ip_high)
+ _ip_low_int = int(_ip_low)
+ _ip_high_int = int(_ip_high)
+ return _ip_high.max_prefixlen - (_ip_high_int ^ _ip_low_int).bit_length()
+
+
def try_int(s, *args):
"""Convert to integer if possible."""
try:
def __del__(self): # pragma: no cover
signal.alarm(0)
+ def delta_time_sec(self):
+ return (datetime.datetime.now() - self.start).total_seconds()
+
def read_meminfo(ssh_client):
"""Read "/proc/meminfo" file and parse all keys and values"""
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""Wait until callable predicate is evaluated as True
+ When in a thread different from the main one, Timer(timeout) will fail
+ because signal is not handled. In this case
+
:param predicate: (func) callable deciding whether waiting should continue
:param timeout: (int) timeout in seconds how long should function wait
:param sleep: (int) polling interval for results in seconds
:param exception: exception instance to raise on timeout. If None is passed
(default) then WaitTimeout exception is raised.
"""
- try:
- with Timer(timeout=timeout):
- while not predicate():
+ if isinstance(threading.current_thread(), threading._MainThread):
+ try:
+ with Timer(timeout=timeout):
+ while not predicate():
+ time.sleep(sleep)
+ except exceptions.TimerTimeout:
+ if exception and issubclass(exception, Exception):
+ raise exception # pylint: disable=raising-bad-type
+ raise exceptions.WaitTimeout
+ else:
+ with Timer() as timer:
+ while timer.delta_time_sec() < timeout:
+ if predicate():
+ return
time.sleep(sleep)
- except exceptions.TimerTimeout:
if exception and issubclass(exception, Exception):
raise exception # pylint: disable=raising-bad-type
raise exceptions.WaitTimeout