fix container image and use latest code
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
index ba21913..40375c5 100644 (file)
@@ -1,5 +1,5 @@
 ##
-## Copyright (c) 2010-2019 Intel Corporation
+## Copyright (c) 2010-2020 Intel Corporation
 ##
 ## Licensed under the Apache License, Version 2.0 (the "License");
 ## you may not use this file except in compliance with the License.
 ##
 
 from __future__ import print_function
+from __future__ import division
 
+from builtins import map
+from builtins import range
+from past.utils import old_div
+from builtins import object
 import os
+import time
 import subprocess
 import socket
+from rapid_log import RapidLog
 
 class prox_ctrl(object):
     def __init__(self, ip, key=None, user=None):
         self._ip   = ip
         self._key  = key
         self._user = user
-        self._children = []
         self._proxsock = []
 
     def ip(self):
         return self._ip
 
-    def connect(self):
+    def test_connect(self):
         """Simply try to run 'true' over ssh on remote system.
         On failure, raise RuntimeWarning exception when possibly worth
         retrying, and raise RuntimeError exception otherwise.
         """
-        return self.run_cmd('true', True)
+        return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
+
+    def connect(self):
+        attempts = 1
+        RapidLog.debug("Trying to connect to machine \
+                on %s, attempt: %d" % (self._ip, attempts))
+        while True:
+            try:
+                self.test_connect()
+                break
+            except RuntimeWarning as ex:
+                RapidLog.debug("RuntimeWarning %d:\n%s"
+                    % (ex.returncode, ex.output.strip()))
+                attempts += 1
+                if attempts > 20:
+                    RapidLog.exception("Failed to connect to instance after %d\
+                            attempts:\n%s" % (attempts, ex))
+                time.sleep(2)
+                RapidLog.debug("Trying to connect to machine \
+                       on %s, attempt: %d" % (self._ip, attempts))
+        RapidLog.debug("Connected to machine on %s" % self._ip)
+
+    def connect_socket(self):
+        attempts = 1
+        RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
+                attempt: %d" % (self._ip, attempts))
+        sock = None
+        while True:
+            sock = self.prox_sock()
+            if sock is not None:
+                break
+            attempts += 1
+            if attempts > 20:
+                RapidLog.exception("Failed to connect to PROX on %s after %d \
+                        attempts" % (self._ip, attempts))
+            time.sleep(2)
+            RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
+                    attempt: %d" % (self._ip, attempts))
+        RapidLog.info("Connected to PROX on %s" % self._ip)
+        return sock
 
     def close(self):
-        """Must be called before program termination."""
-#        for prox in self._proxsock:
-#            prox.quit()
-        children = len(self._children)
-        if children == 0:
-            return
-        if children > 1:
-            print('Waiting for %d child processes to complete ...' % children)
-        for child in self._children:
-            ret = os.waitpid(child[0], os.WNOHANG)
-            if ret[0] == 0:
-                print("Waiting for child process '%s' to complete ..." % child[1])
-                ret = os.waitpid(child[0], 0)
-            rc = ret[1]
-            if os.WIFEXITED(rc):
-                if os.WEXITSTATUS(rc) == 0:
-                    print("Child process '%s' completed successfully" % child[1])
-                else:
-                    print("Child process '%s' returned exit status %d" % (
-                            child[1], os.WEXITSTATUS(rc)))
-            elif os.WIFSIGNALED(rc):
-                print("Child process '%s' exited on signal %d" % (
-                        child[1], os.WTERMSIG(rc)))
-            else:
-                print("Wait status for child process '%s' is 0x%04x" % (
-                        child[1], rc))
+        for sock in self._proxsock:
+            sock.quit()
 
     def run_cmd(self, command, _connect=False):
         """Execute command over ssh on remote system.
@@ -76,34 +97,9 @@ class prox_ctrl(object):
         try:
             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
         except subprocess.CalledProcessError as ex:
-            if _connect and ex.returncode == 255:
-                raise RuntimeWarning(ex.output.strip())
-            raise RuntimeError('ssh returned exit status %d:\n%s'
+            RapidLog.exception('ssh returned exit status %d:\n%s'
                     % (ex.returncode, ex.output.strip()))
 
-    def fork_cmd(self, command, name=None):
-        """Execute command over ssh on remote system, in a child process.
-        Do not wait for remote command completion.
-        Return child process id.
-        """
-        if name is None:
-            name = command
-        cmd = self._build_ssh(command)
-        pid = os.fork()
-        if (pid != 0):
-            # In the parent process
-            self._children.append((pid, name))
-            return pid
-        # In the child process: use os._exit to terminate
-        try:
-            # Actually ignore output on success, but capture stderr on failure
-            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            raise RuntimeError("Child process '%s' failed:\n"
-                    'ssh returned exit status %d:\n%s'
-                    % (name, ex.returncode, ex.output.strip()))
-        os._exit(0)
-
     def prox_sock(self, port=8474):
         """Connect to the PROX instance on remote system.
         Return a prox_sock object on success, None on failure.
@@ -136,7 +132,29 @@ class prox_ctrl(object):
             # Actually ignore output on success, but capture stderr on failure
             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
         except subprocess.CalledProcessError as ex:
-            raise RuntimeError('scp returned exit status %d:\n%s'
+            RapidLog.exception('scp returned exit status %d:\n%s'
+                    % (ex.returncode, ex.output.strip()))
+
+    def scp_get(self, src, dst):
+        """Copy src file from remote system to dst on local system."""
+        cmd = [ 'scp',
+                '-B',
+                '-oStrictHostKeyChecking=no',
+                '-oUserKnownHostsFile=/dev/null',
+                '-oLogLevel=ERROR' ]
+        if self._key is not None:
+            cmd.extend(['-i', self._key])
+        remote = ''
+        if self._user is not None:
+            remote += self._user + '@'
+        remote += self._ip + ':/home/' + self._user + src
+        cmd.append(remote)
+        cmd.append(dst)
+        try:
+            # Actually ignore output on success, but capture stderr on failure
+            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as ex:
+            RapidLog.exception('scp returned exit status %d:\n%s'
                     % (ex.returncode, ex.output.strip()))
 
     def _build_ssh(self, command):
@@ -160,9 +178,8 @@ class prox_sock(object):
         self._sock = sock
         self._rcvd = b''
 
-    def quit(self):
+    def __del__(self):
         if self._sock is not None:
-            self._send('quit')
             self._sock.close()
             self._sock = None
 
@@ -174,84 +191,70 @@ class prox_sock(object):
 
     def speed(self, speed, cores, tasks=[0]):
         for core in cores:
-               for task in tasks:
-                       self._send('speed %s %s %s' % (core, task, speed))
+            for task in tasks:
+                self._send('speed %s %s %s' % (core, task, speed))
 
     def reset_stats(self):
         self._send('reset stats')
 
     def lat_stats(self, cores, tasks=[0]):
-        min_lat = 999999999
-       max_lat = avg_lat = 0
-       number_tasks_returning_stats = 0
-       buckets = [0] * 128
-        self._send('lat all stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
-        for core in cores:
-               for task in tasks:
-                       stats = self._recv().split(',')
-                       if 'is not measuring' in stats[0]:
-                               continue
-                       if stats[0].startswith('error'):
-                               log.critical("lat stats error: unexpected reply from PROX (potential incompatibility between scripts and PROX)")
-                               raise Exception("lat stats error")
-                       number_tasks_returning_stats += 1
-                       min_lat = min(int(stats[0]),min_lat)
-                       max_lat = max(int(stats[1]),max_lat)
-                       avg_lat += int(stats[2])
-                       #min_since begin = int(stats[3])
-                       #max_since_begin = int(stats[4])
-                       tsc = int(stats[5]) # Taking the last tsc as the timestamp since PROX will return the same tsc for each core/task combination 
-                       hz = int(stats[6])
-                       #coreid = int(stats[7])
-                       #taskid = int(stats[8])
-                       stats = self._recv().split(':')
-                       if stats[0].startswith('error'):
-                               log.critical("lat stats error: unexpected lat bucket reply (potential incompatibility between scripts and PROX)")
-                               raise Exception("lat bucket reply error")
-                       buckets[0] = int(stats[1])
-                       for i in range(1, 128):
-                               stats = self._recv().split(':')
-                               buckets[i] = int(stats[1])
-        avg_lat = avg_lat/number_tasks_returning_stats
-        self._send('stats latency(0).used')
-        used = float(self._recv())
-        self._send('stats latency(0).total')
-        total = float(self._recv())
-        return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets
-
-    def old_lat_stats(self, cores, tasks=[0]):
-        min_lat = 999999999
-       max_lat = avg_lat = 0
-       number_tasks_returning_stats = 0
-       buckets = [0] * 128
-        self._send('lat stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
+        result = {}
+        result['lat_min'] = 999999999
+        result['lat_max'] = result['lat_avg'] = 0
+        result['buckets'] = [0] * 128
+        result['mis_ordered'] = 0
+        result['extent'] = 0
+        result['duplicate'] = 0
+        number_tasks_returning_stats = 0
+        self._send('lat all stats %s %s' % (','.join(map(str, cores)),
+            ','.join(map(str, tasks))))
         for core in cores:
-               for task in tasks:
-                       stats = self._recv().split(',')
-                       if stats[0].startswith('error'):
-                               if stats[0].startswith('error: invalid syntax'):
-                                       log.critical("lat stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
-                                       raise Exception("lat stats error")
-                               continue
-                       number_tasks_returning_stats += 1
-                       min_lat = min(int(stats[0]),min_lat)
-                       max_lat = max(int(stats[1]),max_lat)
-                       avg_lat += int(stats[2])
-                       #min_since begin = int(stats[3])
-                       #max_since_begin = int(stats[4])
-                       tsc = int(stats[5])
-                       hz = int(stats[6])
-                       #coreid = int(stats[7])
-                       #taskid = int(stats[8])
-        avg_lat = avg_lat/number_tasks_returning_stats
+            for task in tasks:
+                stats = self._recv().split(',')
+            if 'is not measuring' in stats[0]:
+                continue
+            if stats[0].startswith('error'):
+                RapidLog.critical("lat stats error: unexpected reply from PROX\
+                        (potential incompatibility between scripts and PROX)")
+                raise Exception("lat stats error")
+            number_tasks_returning_stats += 1
+            result['lat_min'] = min(int(stats[0]),result['lat_min'])
+            result['lat_max'] = max(int(stats[1]),result['lat_max'])
+            result['lat_avg'] += int(stats[2])
+            #min_since begin = int(stats[3])
+            #max_since_begin = int(stats[4])
+            result['lat_tsc'] = int(stats[5])
+            # Taking the last tsc as the timestamp since
+            # PROX will return the same tsc for each
+            # core/task combination
+            result['lat_hz'] = int(stats[6])
+            #coreid = int(stats[7])
+            #taskid = int(stats[8])
+            result['mis_ordered'] += int(stats[9])
+            result['extent'] += int(stats[10])
+            result['duplicate'] += int(stats[11])
+            stats = self._recv().split(':')
+            if stats[0].startswith('error'):
+                RapidLog.critical("lat stats error: unexpected lat bucket \
+                        reply (potential incompatibility between scripts \
+                        and PROX)")
+                raise Exception("lat bucket reply error")
+            result['buckets'][0] = int(stats[1])
+            for i in range(1, 128):
+                stats = self._recv().split(':')
+                result['buckets'][i] += int(stats[1])
+        result['lat_avg'] = old_div(result['lat_avg'],
+                number_tasks_returning_stats)
         self._send('stats latency(0).used')
         used = float(self._recv())
         self._send('stats latency(0).total')
         total = float(self._recv())
-        return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets
+        result['lat_used'] = old_div(used,total)
+        return (result)
 
     def irq_stats(self, core, bucket, task=0):
-        self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket))
+        self._send('stats task.core(%s).task(%s).irq(%s)' %
+                (core, task, bucket))
         stats = self._recv().split(',')
         return int(stats[0])
 
@@ -259,71 +262,93 @@ class prox_sock(object):
         rx = tx = drop = tsc = hz = 0
         self._send('show irq buckets %s %s' % (core,task))
         buckets = self._recv().split(';')
-       buckets = buckets[:-1]
+        buckets = buckets[:-1]
         return buckets
 
     def core_stats(self, cores, tasks=[0]):
         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
-        self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
+        self._send('dp core stats %s %s' % (','.join(map(str, cores)),
+            ','.join(map(str, tasks))))
         for core in cores:
-               for task in tasks:
-                       stats = self._recv().split(',')
-                       if stats[0].startswith('error'):  
-                               if stats[0].startswith('error: invalid syntax'):
-                                       log.critical("dp core stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
-                                       raise Exception("dp core stats error")
-                               continue
-                       rx += int(stats[0])
-                       tx += int(stats[1])
-                       rx_non_dp += int(stats[2])
-                       tx_non_dp += int(stats[3])
-                       drop += int(stats[4])
-                       tx_fail += int(stats[5])
-                       tsc = int(stats[6])
-                       hz = int(stats[7])
+            for task in tasks:
+                stats = self._recv().split(',')
+                if stats[0].startswith('error'):
+                    if stats[0].startswith('error: invalid syntax'):
+                        RapidLog.critical("dp core stats error: unexpected \
+                                invalid syntax (potential incompatibility \
+                                between scripts and PROX)")
+                        raise Exception("dp core stats error")
+                    continue
+                rx += int(stats[0])
+                tx += int(stats[1])
+                rx_non_dp += int(stats[2])
+                tx_non_dp += int(stats[3])
+                drop += int(stats[4])
+                tx_fail += int(stats[5])
+                tsc = int(stats[6])
+                hz = int(stats[7])
         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
 
     def multi_port_stats(self, ports=[0]):
         rx = tx = port_id = tsc = no_mbufs = errors = 0
         self._send('multi port stats %s' % (','.join(map(str, ports))))
-       result = self._recv().split(';')
-       if result[0].startswith('error'):  
-               log.critical("multi port stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
-               raise Exception("multi port stats error")
+        result = self._recv().split(';')
+        if result[0].startswith('error'):
+            RapidLog.critical("multi port stats error: unexpected invalid \
+                    syntax (potential incompatibility between scripts and \
+                    PROX)")
+            raise Exception("multi port stats error")
         for statistics in result:
-               stats = statistics.split(',')
-               port_id = int(stats[0])
-               rx += int(stats[1])
-               tx += int(stats[2])
-               no_mbufs += int(stats[3])
-               errors += int(stats[4])
-               tsc = int(stats[5])
+            stats = statistics.split(',')
+            port_id = int(stats[0])
+            rx += int(stats[1])
+            tx += int(stats[2])
+            no_mbufs += int(stats[3])
+            errors += int(stats[4])
+            tsc = int(stats[5])
         return rx, tx, no_mbufs, errors, tsc
 
     def set_random(self, cores, task, offset, mask, length):
-        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length))
+        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
+            task, offset, mask, length))
 
     def set_size(self, cores, task, pkt_size):
-        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size))
+        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
+            pkt_size))
+
+    def set_imix(self, cores, task, imix):
+        self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
+            ','.join(map(str,imix))))
 
     def set_value(self, cores, task, offset, value, length):
-        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length))
+        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
+            task, offset, value, length))
+
+    def quit_prox(self):
+        self._send('quit')
 
     def _send(self, cmd):
         """Append LF and send command to the PROX instance."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
-        self._sock.sendall(cmd.encode() + b'\n')
+        try:
+            self._sock.sendall(cmd.encode() + b'\n')
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
 
     def _recv(self):
-        """Receive response from PROX instance, and return it with LF removed."""
+        """Receive response from PROX instance, return it with LF removed."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot receive anymore")
-        pos = self._rcvd.find(b'\n')
-        while pos == -1:
-            self._rcvd += self._sock.recv(256)
+        try:
             pos = self._rcvd.find(b'\n')
-        rsp = self._rcvd[:pos]
-        self._rcvd = self._rcvd[pos+1:]
+            while pos == -1:
+                self._rcvd += self._sock.recv(256)
+                pos = self._rcvd.find(b'\n')
+            rsp = self._rcvd[:pos]
+            self._rcvd = self._rcvd[pos+1:]
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
         return rsp.decode()
-