Fix latency buckets with multiple lat cores 01/72501/4
authorLuc Provoost <luc.provoost@intel.com>
Mon, 10 May 2021 14:04:22 +0000 (16:04 +0200)
committerLuc Provoost <luc.provoost@intel.com>
Wed, 12 May 2021 19:07:47 +0000 (19:07 +0000)
The bucket data of the last latency core was taken into account when
collecting the latency stats. We are now adding the bucket data for all
cores.
Also changed the error reporting when PROX instance is breaking the pipe
connection (e.g. when the dataplane network is overloaded).
Cleaning up some trailing spaces in pox_ctrl.py

Change-Id: I09ba01ac65e7e4e9ff03ad47da83aa4f83250a67
Signed-off-by: Luc Provoost <luc.provoost@intel.com>
VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py
VNFs/DPPD-PROX/helper-scripts/rapid/rapid_log.py
VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py

index 8cd2e3f..40375c5 100644 (file)
@@ -25,7 +25,7 @@ import os
 import time
 import subprocess
 import socket
-from rapid_log import RapidLog 
+from rapid_log import RapidLog
 
 class prox_ctrl(object):
     def __init__(self, ip, key=None, user=None):
@@ -223,9 +223,10 @@ class prox_sock(object):
             result['lat_avg'] += int(stats[2])
             #min_since begin = int(stats[3])
             #max_since_begin = int(stats[4])
-            result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since
-                                # PROX will return the same tsc for each 
-                                # core/task combination 
+            result['lat_tsc'] = int(stats[5])
+            # Taking the last tsc as the timestamp since
+            # PROX will return the same tsc for each
+            # core/task combination
             result['lat_hz'] = int(stats[6])
             #coreid = int(stats[7])
             #taskid = int(stats[8])
@@ -241,7 +242,7 @@ class prox_sock(object):
             result['buckets'][0] = int(stats[1])
             for i in range(1, 128):
                 stats = self._recv().split(':')
-                result['buckets'][i] = int(stats[1])
+                result['buckets'][i] += int(stats[1])
         result['lat_avg'] = old_div(result['lat_avg'],
                 number_tasks_returning_stats)
         self._send('stats latency(0).used')
@@ -252,7 +253,7 @@ class prox_sock(object):
         return (result)
 
     def irq_stats(self, core, bucket, task=0):
-        self._send('stats task.core(%s).task(%s).irq(%s)' % 
+        self._send('stats task.core(%s).task(%s).irq(%s)' %
                 (core, task, bucket))
         stats = self._recv().split(',')
         return int(stats[0])
@@ -266,12 +267,12 @@ class prox_sock(object):
 
     def core_stats(self, cores, tasks=[0]):
         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
-        self._send('dp core stats %s %s' % (','.join(map(str, cores)), 
+        self._send('dp core stats %s %s' % (','.join(map(str, cores)),
             ','.join(map(str, tasks))))
         for core in cores:
             for task in tasks:
                 stats = self._recv().split(',')
-                if stats[0].startswith('error'):  
+                if stats[0].startswith('error'):
                     if stats[0].startswith('error: invalid syntax'):
                         RapidLog.critical("dp core stats error: unexpected \
                                 invalid syntax (potential incompatibility \
@@ -292,7 +293,7 @@ class prox_sock(object):
         rx = tx = port_id = tsc = no_mbufs = errors = 0
         self._send('multi port stats %s' % (','.join(map(str, ports))))
         result = self._recv().split(';')
-        if result[0].startswith('error'):  
+        if result[0].startswith('error'):
             RapidLog.critical("multi port stats error: unexpected invalid \
                     syntax (potential incompatibility between scripts and \
                     PROX)")
@@ -308,19 +309,19 @@ class prox_sock(object):
         return rx, tx, no_mbufs, errors, tsc
 
     def set_random(self, cores, task, offset, mask, length):
-        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), 
+        self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
             task, offset, mask, length))
 
     def set_size(self, cores, task, pkt_size):
-        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, 
+        self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
             pkt_size))
 
     def set_imix(self, cores, task, imix):
-        self._send('imix %s %s %s' % (','.join(map(str, cores)), task, 
+        self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
             ','.join(map(str,imix))))
 
     def set_value(self, cores, task, offset, value, length):
-        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), 
+        self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
             task, offset, value, length))
 
     def quit_prox(self):
@@ -330,16 +331,24 @@ class prox_sock(object):
         """Append LF and send command to the PROX instance."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
-        self._sock.sendall(cmd.encode() + b'\n')
+        try:
+            self._sock.sendall(cmd.encode() + b'\n')
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
 
     def _recv(self):
         """Receive response from PROX instance, return it with LF removed."""
         if self._sock is None:
             raise RuntimeError("PROX socket closed, cannot receive anymore")
-        pos = self._rcvd.find(b'\n')
-        while pos == -1:
-            self._rcvd += self._sock.recv(256)
+        try:
             pos = self._rcvd.find(b'\n')
-        rsp = self._rcvd[:pos]
-        self._rcvd = self._rcvd[pos+1:]
+            while pos == -1:
+                self._rcvd += self._sock.recv(256)
+                pos = self._rcvd.find(b'\n')
+            rsp = self._rcvd[:pos]
+            self._rcvd = self._rcvd[pos+1:]
+        except ConnectionResetError as e:
+            RapidLog.error('Pipe reset by Prox instance: traffic too high?')
+            raise
         return rsp.decode()
index f453c57..1ad5427 100644 (file)
@@ -120,11 +120,12 @@ class RapidLog(object):
     @staticmethod
     def exception(exception_info):
         RapidLog.log.exception(exception_info)
-        raise Exception(exception_info)
+        exit(1)
 
+    @staticmethod
     def critical(critical_info):
         RapidLog.log.critical(critical_info)
-        raise Exception(critical_info)
+        exit(1)
 
     @staticmethod
     def error(error_info):
index 33fb8df..5f78ec0 100755 (executable)
@@ -87,52 +87,74 @@ class RapidTestManager(object):
                         if machine_params['prox_socket']:
                             sut_machine = machine
             self.machines.append(machine)
-        prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
-        self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
-        if configonly:
-            concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
-            sys.exit()
-        with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines)) as executor:
-            future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in self.machines}
+        try:
+            prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+            self.future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in self.machines}
+            if configonly:
+                concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+                sys.exit()
+            socket_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(self.machines))
+            future_to_connect_prox = {socket_executor.submit(machine.connect_prox): machine for machine in self.machines}
             concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED)
-        result = 0
-        for test_param in test_params['tests']:
-            RapidLog.info(test_param['test'])
-            if test_param['test'] in ['flowsizetest', 'TST009test',
-                    'fixed_rate', 'increment_till_fail']:
-                test = FlowSizeTest(test_param, test_params['lat_percentile'],
-                        test_params['runtime'], 
-                        test_params['TestName'], 
-                        test_params['environment_file'], gen_machine,
-                        sut_machine, background_machines)
-            elif test_param['test'] in ['corestatstest']:
-                test = CoreStatsTest(test_param, test_params['runtime'],
-                        test_params['TestName'], 
-                        test_params['environment_file'], self.machines)
-            elif test_param['test'] in ['portstatstest']:
-                test = PortStatsTest(test_param, test_params['runtime'],
-                        test_params['TestName'], 
-                        test_params['environment_file'], self.machines)
-            elif test_param['test'] in ['impairtest']:
-                test = ImpairTest(test_param, test_params['lat_percentile'],
-                        test_params['runtime'],
-                        test_params['TestName'], 
-                        test_params['environment_file'], gen_machine,
-                        sut_machine, background_machines)
-            elif test_param['test'] in ['irqtest']:
-                test = IrqTest(test_param, test_params['runtime'],
-                        test_params['TestName'], 
-                        test_params['environment_file'], self.machines)
-            elif test_param['test'] in ['warmuptest']:
-                test = WarmupTest(test_param, gen_machine)
-            else:
-                RapidLog.debug('Test name ({}) is not valid:'.format(
-                    test_param['test']))
-            single_test_result, result_details = test.run()
-            result = result + single_test_result
-        for machine in self.machines:
-            machine.close_prox()
-        concurrent.futures.wait(self.future_to_prox,return_when=ALL_COMPLETED)
+            result = 0
+            for test_param in test_params['tests']:
+                RapidLog.info(test_param['test'])
+                if test_param['test'] in ['flowsizetest', 'TST009test',
+                        'fixed_rate', 'increment_till_fail']:
+                    test = FlowSizeTest(test_param,
+                            test_params['lat_percentile'],
+                            test_params['runtime'],
+                            test_params['TestName'],
+                            test_params['environment_file'],
+                            gen_machine,
+                            sut_machine, background_machines)
+                elif test_param['test'] in ['corestatstest']:
+                    test = CoreStatsTest(test_param,
+                            test_params['runtime'],
+                            test_params['TestName'],
+                            test_params['environment_file'],
+                            self.machines)
+                elif test_param['test'] in ['portstatstest']:
+                    test = PortStatsTest(test_param,
+                            test_params['runtime'],
+                            test_params['TestName'],
+                            test_params['environment_file'],
+                            self.machines)
+                elif test_param['test'] in ['impairtest']:
+                    test = ImpairTest(test_param,
+                            test_params['lat_percentile'],
+                            test_params['runtime'],
+                            test_params['TestName'],
+                            test_params['environment_file'],
+                            gen_machine,
+                            sut_machine, background_machines)
+                elif test_param['test'] in ['irqtest']:
+                    test = IrqTest(test_param,
+                            test_params['runtime'],
+                            test_params['TestName'],
+                            test_params['environment_file'],
+                            self.machines)
+                elif test_param['test'] in ['warmuptest']:
+                    test = WarmupTest(test_param,
+                            gen_machine)
+                else:
+                    RapidLog.debug('Test name ({}) is not valid:'.format(
+                        test_param['test']))
+                single_test_result, result_details = test.run()
+                result = result + single_test_result
+            for machine in self.machines:
+                machine.close_prox()
+            concurrent.futures.wait(self.future_to_prox,
+                    return_when=ALL_COMPLETED)
+        except (ConnectionError, KeyboardInterrupt) as e:
+            result = result_details = None
+            socket_executor.shutdown(wait=False)
+            socket_executor._threads.clear()
+            prox_executor.shutdown(wait=False)
+            prox_executor._threads.clear()
+            concurrent.futures.thread._threads_queues.clear()
+            RapidLog.error("Test interrupted: {} {}".format(
+                type(e).__name__,e))
         return (result, result_details)
 
 def main():