Using python concurrent futures 56/71156/1
authorLuc Provoost <luc.provoost@intel.com>
Thu, 17 Sep 2020 15:17:36 +0000 (17:17 +0200)
committerLuc Provoost <luc.provoost@intel.com>
Thu, 17 Sep 2020 15:19:14 +0000 (17:19 +0200)
Different PROX instances are now started in parallel. The script is
starting multiple threads.

Change-Id: Ia8785a792240d4e9b5d5d98174bc4c5d7ae5657c
Signed-off-by: Luc Provoost <luc.provoost@intel.com>
VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
VNFs/DPPD-PROX/helper-scripts/rapid/rapid_machine.py
VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py

index 6f00584..a6dd4bc 100644 (file)
@@ -32,7 +32,6 @@ class prox_ctrl(object):
         self._ip   = ip
         self._key  = key
         self._user = user
-        self._children = []
         self._proxsock = []
 
     def __del__(self):
@@ -50,7 +49,7 @@ class prox_ctrl(object):
 
     def connect(self):
         attempts = 1
-        RapidLog.debug("Trying to connect to instance which was just launched \
+        RapidLog.debug("Trying to connect to machine \
                 on %s, attempt: %d" % (self._ip, attempts))
         while True:
             try:
@@ -64,9 +63,9 @@ class prox_ctrl(object):
                     raise Exception("Failed to connect to instance after %d \
                             attempts:\n%s" % (attempts, ex))
                 time.sleep(2)
-                RapidLog.debug("Trying to connect to instance which was just \
-                        launched on %s, attempt: %d" % (self._ip, attempts))
-        RapidLog.debug("Connected to instance on %s" % self._ip)
+                RapidLog.debug("Trying to connect to machine \
+                       on %s, attempt: %d" % (self._ip, attempts))
+        RapidLog.debug("Connected to machine on %s" % self._ip)
 
     def connect_socket(self):
         attempts = 1
@@ -93,31 +92,6 @@ class prox_ctrl(object):
         """Must be called before program termination."""
         for sock in self._proxsock:
             sock.quit()
-        children = len(self._children)
-        if children == 0:
-            return
-        if children > 1:
-            print('Waiting for %d child processes to complete ...' % children)
-        for child in self._children:
-            ret = os.waitpid(child[0], os.WNOHANG)
-            if ret[0] == 0:
-                print("Waiting for child process '%s' to complete ..." 
-                        % child[1])
-                ret = os.waitpid(child[0], 0)
-            rc = ret[1]
-            if os.WIFEXITED(rc):
-                if os.WEXITSTATUS(rc) == 0:
-                    print("Child process '%s' completed successfully" 
-                            % child[1])
-                else:
-                    print("Child process '%s' returned exit status %d" % (
-                            child[1], os.WEXITSTATUS(rc)))
-            elif os.WIFSIGNALED(rc):
-                print("Child process '%s' exited on signal %d" % (
-                        child[1], os.WTERMSIG(rc)))
-            else:
-                print("Wait status for child process '%s' is 0x%04x" % (
-                        child[1], rc))
 
     def run_cmd(self, command, _connect=False):
         """Execute command over ssh on remote system.
@@ -135,29 +109,6 @@ class prox_ctrl(object):
             raise RuntimeError('ssh returned exit status %d:\n%s'
                     % (ex.returncode, ex.output.strip()))
 
-    def fork_cmd(self, command, name=None):
-        """Execute command over ssh on remote system, in a child process.
-        Do not wait for remote command completion.
-        Return child process id.
-        """
-        if name is None:
-            name = command
-        cmd = self._build_ssh(command)
-        pid = os.fork()
-        if (pid != 0):
-            # In the parent process
-            self._children.append((pid, name))
-            return pid
-        # In the child process: use os._exit to terminate
-        try:
-            # Actually ignore output on success, but capture stderr on failure
-            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            raise RuntimeError("Child process '%s' failed:\n"
-                    'ssh returned exit status %d:\n%s'
-                    % (name, ex.returncode, ex.output.strip()))
-        os._exit(0)
-
     def prox_sock(self, port=8474):
         """Connect to the PROX instance on remote system.
         Return a prox_sock object on success, None on failure.
index b89c038..a9f5308 100644 (file)
@@ -45,12 +45,7 @@ class RapidMachine(object):
                 break
         self.rundir = rundir
         self.machine_params = machine_params
-        self._client = prox_ctrl(self.ip, self.key, self.user)
-        self._client.connect()
-        if vim in ['OpenStack']:
-            self.devbind()
-        self.generate_lua(vim)
-        self._client.scp_put(self.machine_params['config_file'], '{}/{}'.format(self.rundir, machine_params['config_file']))
+        self.vim = vim
 
     def __del__(self):
         self._client.scp_get('/prox.log', './{}.prox.log'.format(self.name))
@@ -99,10 +94,23 @@ class RapidMachine(object):
         self._client.scp_put('helper.lua', self.rundir + '/helper.lua')
 
     def start_prox(self, autostart=''):
+        self._client = prox_ctrl(self.ip, self.key, self.user)
+        self._client.connect()
+        if self.vim in ['OpenStack']:
+            self.devbind()
+        self.generate_lua(self.vim)
+        self._client.scp_put(self.machine_params['config_file'], '{}/{}'.format(self.rundir, self.machine_params['config_file']))
         if self.machine_params['prox_launch_exit']:
             cmd = 'sudo {}/prox {} -t -o cli -f {}/{}'.format(self.rundir, autostart, self.rundir, self.machine_params['config_file'])
-            result = self._client.fork_cmd(cmd, 'PROX Testing on {}'.format(self.name))
-            RapidLog.debug("Starting PROX on {}: {}, {}".format(self.name, cmd, result))
+            RapidLog.debug("Starting PROX on {}: {}".format(self.name, cmd))
+            result = self._client.run_cmd(cmd, 'PROX Testing on {}'.format(self.name))
+            #RapidLog.debug("Finished PROX on {}: {}, {}".format(self.name, cmd, result))
+            RapidLog.debug("Finished PROX on {}: {}".format(self.name, cmd))
+
+    def close_prox(self):
+        self.socket.quit()
+
+    def connect_prox(self):
         self.socket = self._client.connect_socket()
 
     def start(self):
index 023b4bc..2c18b23 100755 (executable)
@@ -23,6 +23,8 @@ from future import standard_library
 standard_library.install_aliases()
 from builtins import object
 import sys
+import concurrent.futures
+from concurrent.futures import ALL_COMPLETED
 from rapid_cli import RapidCli
 from rapid_log import RapidLog
 from rapid_parser import RapidConfigParser
@@ -81,8 +83,11 @@ class RapidTestManager(object):
             machines.append(machine)
         if test_params['configonly']:
             sys.exit()
-        for machine in machines:
-            machine.start_prox()
+        prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(machines))
+        future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in machines}
+        with concurrent.futures.ThreadPoolExecutor(max_workers=len(machines)) as executor:
+            future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in machines}
+            concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED)
         result = True
         for test_param in test_params['tests']:
             RapidLog.info(test_param['test'])
@@ -119,6 +124,8 @@ class RapidTestManager(object):
             single_test_result = test.run()
             if not single_test_result:
                 result = False
+        for machine in machines:
+            machine.close_prox()
         return (result)
 
 def main():