Merge "Xena_cont_accuracy: Modify continuous from duration to packet limit"
[vswitchperf.git] / tools / tasks.py
index f8f11d4..9816a33 100644 (file)
@@ -23,12 +23,13 @@ import threading
 import sys
 import os
 import locale
+import time
 
 from conf import settings
+from tools import systeminfo
 
 
 CMD_PREFIX = 'cmd : '
-_MY_ENCODING = locale.getdefaultlocale()[1]
 
 def _get_stdout():
     """Get stdout value for ``subprocess`` calls.
@@ -67,6 +68,7 @@ def run_task(cmd, logger, msg=None, check_error=False):
 
     stdout = []
     stderr = []
+    my_encoding = locale.getdefaultlocale()[1]
 
     if msg:
         logger.info(msg)
@@ -74,8 +76,9 @@ def run_task(cmd, logger, msg=None, check_error=False):
     logger.debug('%s%s', CMD_PREFIX, ' '.join(cmd))
 
     try:
-        proc = subprocess.Popen(
-            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
+        proc = subprocess.Popen(map(os.path.expanduser, cmd),
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE, bufsize=0)
 
         while True:
             reads = [proc.stdout.fileno(), proc.stderr.fileno()]
@@ -83,17 +86,24 @@ def run_task(cmd, logger, msg=None, check_error=False):
 
             for file_d in ret[0]:
                 if file_d == proc.stdout.fileno():
-                    line = proc.stdout.readline()
-                    if settings.getValue('VERBOSITY') == 'debug':
-                        sys.stdout.write(line.decode(_MY_ENCODING))
-                    stdout.append(line)
+                    while True:
+                        line = proc.stdout.readline()
+                        if not line:
+                            break
+                        if settings.getValue('VERBOSITY') == 'debug':
+                            sys.stdout.write(line.decode(my_encoding))
+                        stdout.append(line)
                 if file_d == proc.stderr.fileno():
-                    line = proc.stderr.readline()
-                    sys.stderr.write(line.decode(_MY_ENCODING))
-                    stderr.append(line)
+                    while True:
+                        line = proc.stderr.readline()
+                        if not line:
+                            break
+                        sys.stderr.write(line.decode(my_encoding))
+                        stderr.append(line)
 
             if proc.poll() is not None:
                 break
+
     except OSError as ex:
         handle_error(ex)
     else:
@@ -101,8 +111,8 @@ def run_task(cmd, logger, msg=None, check_error=False):
             ex = subprocess.CalledProcessError(proc.returncode, cmd, stderr)
             handle_error(ex)
 
-    return ('\n'.join(sout.decode(_MY_ENCODING).strip() for sout in stdout),
-            ('\n'.join(sout.decode(_MY_ENCODING).strip() for sout in stderr)))
+    return ('\n'.join(sout.decode(my_encoding).strip() for sout in stdout),
+            ('\n'.join(sout.decode(my_encoding).strip() for sout in stderr)))
 
 def run_background_task(cmd, logger, msg):
     """Run task in background and log when started.
@@ -120,7 +130,7 @@ def run_background_task(cmd, logger, msg):
     logger.info(msg)
     logger.debug('%s%s', CMD_PREFIX, ' '.join(cmd))
 
-    proc = subprocess.Popen(cmd, stdout=_get_stdout(), bufsize=0)
+    proc = subprocess.Popen(map(os.path.expanduser, cmd), stdout=_get_stdout(), bufsize=0)
 
     return proc.pid
 
@@ -148,6 +158,55 @@ def run_interactive_task(cmd, logger, msg):
 
     return child
 
+def terminate_task_subtree(pid, signal='-15', sleep=10, logger=None):
+    """Terminate given process and all its children
+
+    Function will sent given signal to the process. In case
+    that process will not terminate within given sleep interval
+    and signal was not SIGKILL, then process will be killed by SIGKILL.
+    After that function will check if all children of the process
+    are terminated and if not the same terminating procedure is applied
+    on any living child (only one level of children is considered).
+
+    :param pid: Process ID to terminate
+    :param signal: Signal to be sent to the process
+    :param sleep: Maximum delay in seconds after signal is sent
+    :param logger: Logger to write details to
+    """
+    try:
+        output = subprocess.check_output("pgrep -P " + str(pid), shell=True).decode().rstrip('\n')
+    except subprocess.CalledProcessError:
+        output = ""
+
+    terminate_task(pid, signal, sleep, logger)
+
+    # just for case children were kept alive
+    children = output.split('\n')
+    for child in children:
+        terminate_task(child, signal, sleep, logger)
+
+def terminate_task(pid, signal='-15', sleep=10, logger=None):
+    """Terminate process with given pid
+
+    Function will sent given signal to the process. In case
+    that process will not terminate within given sleep interval
+    and signal was not SIGKILL, then process will be killed by SIGKILL.
+
+    :param pid: Process ID to terminate
+    :param signal: Signal to be sent to the process
+    :param sleep: Maximum delay in seconds after signal is sent
+    :param logger: Logger to write details to
+    """
+    if systeminfo.pid_isalive(pid):
+        run_task(['sudo', 'kill', signal, str(pid)], logger)
+        logger.debug('Wait for process %s to terminate after signal %s', pid, signal)
+        for dummy in range(sleep):
+            time.sleep(1)
+            if not systeminfo.pid_isalive(pid):
+                break
+
+        if signal.lstrip('-').upper() not in ('9', 'KILL', 'SIGKILL') and systeminfo.pid_isalive(pid):
+            terminate_task(pid, '-9', sleep, logger)
 
 class Process(object):
     """Control an instance of a long-running process.
@@ -240,12 +299,14 @@ class Process(object):
             self.kill()
             raise exc
 
-    def kill(self):
+    def kill(self, signal='-15', sleep=10):
         """Kill process instance if it is alive.
+
+        :param signal: signal to be sent to the process
+        :param sleep: delay in seconds after signal is sent
         """
-        if self._child and self._child.isalive():
-            run_task(['sudo', 'kill', '-2', str(self._child.pid)],
-                     self._logger)
+        if self.is_running():
+            terminate_task_subtree(self._child.pid, signal, sleep, self._logger)
 
             if self.is_relinquished():
                 self._relinquish_thread.join()
@@ -268,7 +329,7 @@ class Process(object):
 
         :returns: True if process is running, else False
         """
-        return self._child is not None
+        return self._child and self._child.isalive()
 
     def _affinitize_pid(self, core, pid):
         """Affinitize a process with ``pid`` to ``core``.
@@ -291,7 +352,7 @@ class Process(object):
         """
         self._logger.info('Affinitizing process')
 
-        if self._child and self._child.isalive():
+        if self.is_running():
             self._affinitize_pid(core, self._child.pid)
 
     class ContinueReadPrintLoop(threading.Thread):