Fix issue with system_ready_for_rapid file
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
index a9497e1..8754ebc 100644 (file)
@@ -25,33 +25,32 @@ import os
 import time
 import subprocess
 import socket
-from rapid_log import RapidLog 
+from rapid_log import RapidLog
+from rapid_sshclient import SSHClient
 
 class prox_ctrl(object):
-    def __init__(self, ip, key=None, user=None):
+    def __init__(self, ip, key=None, user=None, password = None):
         self._ip   = ip
         self._key  = key
         self._user = user
+        self._password = password
         self._proxsock = []
+        self._sshclient = SSHClient(ip = ip, user = user, password = password,
+                rsa_private_key = key, timeout = None)
 
     def ip(self):
         return self._ip
 
-    def test_connect(self):
-        """Simply try to run 'true' over ssh on remote system.
-        On failure, raise RuntimeWarning exception when possibly worth
-        retrying, and raise RuntimeError exception otherwise.
-        """
-        return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
-
-    def connect(self):
+    def test_connection(self):
         attempts = 1
         RapidLog.debug("Trying to connect to machine \
                 on %s, attempt: %d" % (self._ip, attempts))
         while True:
             try:
-                self.test_connect()
-                break
+                if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \
+                        && echo exists')):
+                    break
+                time.sleep(2)
             except RuntimeWarning as ex:
                 RapidLog.debug("RuntimeWarning %d:\n%s"
                     % (ex.returncode, ex.output.strip()))
@@ -87,18 +86,9 @@ class prox_ctrl(object):
         for sock in self._proxsock:
             sock.quit()
 
-    def run_cmd(self, command, _connect=False):
-        """Execute command over ssh on remote system.
-        Wait for remote command completion.
-        Return command output (combined stdout and stderr).
-        _connect argument is reserved for connect() method.
-        """
-        cmd = self._build_ssh(command)
-        try:
-            return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            RapidLog.exception('ssh returned exit status %d:\n%s'
-                    % (ex.returncode, ex.output.strip()))
+    def run_cmd(self, command):
+        self._sshclient.run_cmd(command)
+        return self._sshclient.get_output()
 
     def prox_sock(self, port=8474):
         """Connect to the PROX instance on remote system.
@@ -114,64 +104,13 @@ class prox_ctrl(object):
             return None
 
     def scp_put(self, src, dst):
-        """Copy src file from local system to dst on remote system."""
-        cmd = [ 'scp',
-                '-B',
-                '-oStrictHostKeyChecking=no',
-                '-oUserKnownHostsFile=/dev/null',
-                '-oLogLevel=ERROR' ]
-        if self._key is not None:
-            cmd.extend(['-i', self._key])
-        cmd.append(src)
-        remote = ''
-        if self._user is not None:
-            remote += self._user + '@'
-        remote += self._ip + ':' + dst
-        cmd.append(remote)
-        try:
-            # Actually ignore output on success, but capture stderr on failure
-            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            RapidLog.exception('scp returned exit status %d:\n%s'
-                    % (ex.returncode, ex.output.strip()))
+        self._sshclient.scp_put(src, dst)
+        RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst))
 
     def scp_get(self, src, dst):
-        """Copy src file from remote system to dst on local system."""
-        cmd = [ 'scp',
-                '-B',
-                '-oStrictHostKeyChecking=no',
-                '-oUserKnownHostsFile=/dev/null',
-                '-oLogLevel=ERROR' ]
-        if self._key is not None:
-            cmd.extend(['-i', self._key])
-        remote = ''
-        if self._user is not None:
-            remote += self._user + '@'
-        remote += self._ip + ':/home/' + self._user + src
-        cmd.append(remote)
-        cmd.append(dst)
-        try:
-            # Actually ignore output on success, but capture stderr on failure
-            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            RapidLog.exception('scp returned exit status %d:\n%s'
-                    % (ex.returncode, ex.output.strip()))
-
-    def _build_ssh(self, command):
-        cmd = [ 'ssh',
-                '-oBatchMode=yes',
-                '-oStrictHostKeyChecking=no',
-                '-oUserKnownHostsFile=/dev/null',
-                '-oLogLevel=ERROR' ]
-        if self._key is not None:
-            cmd.extend(['-i', self._key])
-        remote = ''
-        if self._user is not None:
-            remote += self._user + '@'
-        remote += self._ip
-        cmd.append(remote)
-        cmd.append(command)
-        return cmd
+        self._sshclient.scp_get('/home/' + self._user + src, dst)
+        RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip,
+            self._user, src, dst))
 
 class prox_sock(object):
     def __init__(self, sock):
@@ -201,8 +140,11 @@ class prox_sock(object):
         result = {}
         result['lat_min'] = 999999999
         result['lat_max'] = result['lat_avg'] = 0
-        number_tasks_returning_stats = 0
         result['buckets'] = [0] * 128
+        result['mis_ordered'] = 0
+        result['extent'] = 0
+        result['duplicate'] = 0
+        number_tasks_returning_stats = 0
         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
             ','.join(map(str, tasks))))
         for core in cores:
@@ -220,15 +162,16 @@ class prox_sock(object):
             result['lat_avg'] += int(stats[2])
             #min_since begin = int(stats[3])
             #max_since_begin = int(stats[4])
-            result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since
-                                # PROX will return the same tsc for each 
-                                # core/task combination 
+            result['lat_tsc'] = int(stats[5])
+            # Taking the last tsc as the timestamp since
+            # PROX will return the same tsc for each
+            # core/task combination
             result['lat_hz'] = int(stats[6])
             #coreid = int(stats[7])
             #taskid = int(stats[8])
-            mis_ordered = int(stats[9])
-            extent = int(stats[10])
-            duplicate = int(stats[11])
+            result['mis_ordered'] += int(stats[9])
+            result['extent'] += int(stats[10])
+            result['duplicate'] += int(stats[11])
             stats = self._recv().split(':')
             if stats[0].startswith('error'):
                 RapidLog.critical("lat stats error: unexpected lat bucket \
@@ -238,7 +181,7 @@ class prox_sock(object):
             result['buckets'][0] = int(stats[1])
             for i in range(1, 128):
                 stats = self._recv().split(':')
-                result['buckets'][i] = int(stats[1])
+                result['buckets'][i] += int(stats[1])
         result['lat_avg'] = old_div(result['lat_avg'],
                 number_tasks_returning_stats)
         self._send('stats latency(0).used')
@@ -249,7 +192,7 @@ class prox_sock(object):
         return (result)
 
     def irq_stats(self, core, bucket, task=0):
-        self._send('stats task.core(%s).task(%s).irq(%s)' % 
+        self._send('stats task.core(%s).task(%s).irq(%s)' %
                 (core, task, bucket))
         stats = self._recv().split(',')
         return int(stats[0])
@@ -263,12 +206,12 @@ class prox_sock(object):
 
     def core_stats(self, cores, tasks=[0]):
         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
-        self._send('dp core stats %s %s' % (','.join(map(str, cores)), 
+        self._send('dp core stats %s %s' % (','.join(map(str, cores)),
             ','.join(map(str, tasks))))
         for core in cores:
             for task in tasks:
                 stats = self._recv().split(',')
-                if stats[0].startswith('error'):  
+                if stats[0].startswith('error'):
                     if stats[0].startswith('error: invalid syntax'):
                         RapidLog.critical("dp core stats error: unexpected \
                                 invalid syntax (potential incompatibility \
@@ -289,7 +232,7 @@ class prox_sock(object):
         rx = tx = port_id = tsc = no_mbufs = errors = 0
         self._send('multi port stats %s' % (','.join(map(str, ports))))
         result = self._recv().split(';')
-        if result[0].startswith('error'):  
+        if result[0].startswith('error'):
             RapidLog.critical("multi port stats error: unexpected invalid \
                     syntax (potential incompatibility between scripts and \
                     PROX)")
@@ -305,19 +248,19 @@ class prox_sock(object):
         return rx, tx, no_mbufs, errors, tsc
 
     def set_random(self, cores, task, offset, mask, length):
-        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), 
+        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
             task, offset, mask, length))
 
     def set_size(self, cores, task, pkt_size):
-        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, 
+        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
             pkt_size))
 
     def set_imix(self, cores, task, imix):
-        self._send('imix %s %s %s' % (','.join(map(str, cores)), task, 
+        self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
             ','.join(map(str,imix))))
 
     def set_value(self, cores, task, offset, value, length):
-        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), 
+        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
             task, offset, value, length))
 
     def quit_prox(self):
@@ -327,16 +270,24 @@ class prox_sock(object):
         """Append LF and send command to the PROX instance."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
-        self._sock.sendall(cmd.encode() + b'\n')
+        try:
+            self._sock.sendall(cmd.encode() + b'\n')
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
 
     def _recv(self):
         """Receive response from PROX instance, return it with LF removed."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot receive anymore")
-        pos = self._rcvd.find(b'\n')
-        while pos == -1:
-            self._rcvd += self._sock.recv(256)
+        try:
             pos = self._rcvd.find(b'\n')
-        rsp = self._rcvd[:pos]
-        self._rcvd = self._rcvd[pos+1:]
+            while pos == -1:
+                self._rcvd += self._sock.recv(256)
+                pos = self._rcvd.find(b'\n')
+            rsp = self._rcvd[:pos]
+            self._rcvd = self._rcvd[pos+1:]
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
         return rsp.decode()