NFVBENCH-25 Send run results to fluentd 61/45961/1
authorKerim Gokarslan <kgokarsl@cisco.com>
Thu, 14 Sep 2017 03:26:01 +0000 (20:26 -0700)
committerahothan <ahothan@cisco.com>
Fri, 20 Oct 2017 07:37:13 +0000 (00:37 -0700)
Change-Id: I671a9297b90784bc30eee48ea9244a9c63a24e85
Signed-off-by: Kerim Gokarslan <kgokarsl@cisco.com>
nfvbench/fluentd.py
nfvbench/nfvbench.py
nfvbench/nfvbenchd.py
nfvbench/summarizer.py
nfvbench/traffic_client.py
nfvbench/traffic_gen/traffic_utils.py
nfvbench/traffic_gen/trex.py

index 93c90fa..a9bda62 100644 (file)
@@ -61,6 +61,10 @@ class FluentLogHandler(logging.Handler):
         self.__update_stats(record.levelno)
         self.sender.emit(None, data)
 
+    # this function is called by summarizer
+    def record_send(self, record):
+        self.sender.emit(None, record)
+
     # send START record for each run
     def __send_start_record(self):
         data = {
index 67b953f..920838a 100644 (file)
@@ -127,9 +127,13 @@ class NFVBench(object):
                     'error_message': message
                 }
 
-    def print_summary(self, result):
-        """Print summary of the result"""
-        summary = NFVBenchSummarizer(result)
+    def prepare_summary(self, result):
+        """Prepares summary of the result to print and send it to logger (eg: fluentd)"""
+        sender = FluentLogHandler("resultnfvbench",
+                                  fluentd_ip=self.config.fluentd.ip,
+                                  fluentd_port=self.config.fluentd.port) \
+            if self.config.fluentd.logging_tag else None
+        summary = NFVBenchSummarizer(result, sender)
         LOG.info(str(summary))
 
     def save(self, result):
@@ -453,7 +457,7 @@ def main():
 
         if opts.summary:
             with open(opts.summary) as json_data:
-                print NFVBenchSummarizer(json.load(json_data))
+                print NFVBenchSummarizer(json.load(json_data), None)
             sys.exit(0)
 
         # show default config in text/yaml format
@@ -539,7 +543,7 @@ def main():
 
                 if 'result' in result and result['status']:
                     nfvbench.save(result['result'])
-                    nfvbench.print_summary(result['result'])
+                    nfvbench.prepare_summary(result['result'])
     except Exception as exc:
         run_summary_required = True
         LOG.error({
index 3534950..3ab30de 100644 (file)
@@ -21,6 +21,8 @@ from flask import request
 
 from flask_socketio import emit
 from flask_socketio import SocketIO
+from fluentd import FluentLogHandler
+from summarizer import NFVBenchSummarizer
 
 import json
 from log import LOG
@@ -211,10 +213,15 @@ class WebSocketIoServer(object):
     notifications using websocket events (send_ methods). Caller should simply create an instance
     of this class and pass a runner object then invoke the run method
     """
+
     def __init__(self, http_root, runner, logger):
         self.nfvbench_runner = runner
         setup_flask(http_root)
         self.fluent_logger = logger
+        self.result_fluent_logger = FluentLogHandler("resultnfvbench",
+                                                     fluentd_ip=self.fluent_logger.sender.host,
+                                                     fluentd_port=self.fluent_logger.sender.port) \
+            if self.fluent_logger else None
 
     def run(self, host='127.0.0.1', port=7556):
 
@@ -246,8 +253,11 @@ class WebSocketIoServer(object):
             else:
                 # this might overwrite a previously unfetched result
                 Ctx.set_result(results)
+            summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
+            LOG.info(str(summary))
             Ctx.release()
-            self.fluent_logger.send_run_summary(True)
+            if self.fluent_logger:
+                self.fluent_logger.send_run_summary(True)
 
     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
index 4ee2426..1eaa8d6 100644 (file)
@@ -16,7 +16,9 @@
 
 import bitmath
 from contextlib import contextmanager
+from datetime import datetime
 import math
+import pytz
 from specs import ChainType
 from tabulate import tabulate
 
@@ -90,7 +92,7 @@ class Table(object):
         self.columns = len(header_row)
 
     def add_row(self, row):
-        assert(self.columns == len(row))
+        assert (self.columns == len(row))
         formatted_row = []
         for entry, formatter in zip(row, self.formatters):
             formatted_row.append(formatter(entry))
@@ -121,7 +123,7 @@ class Summarizer(object):
         self.marker_stack.append(marker)
 
     def __unindent(self):
-        assert(self.indent_size >= self.indent_per_level)
+        assert (self.indent_size >= self.indent_per_level)
         self.indent_size -= self.indent_per_level
         self.marker_stack.pop()
 
@@ -208,10 +210,16 @@ class NFVBenchSummarizer(Summarizer):
     direction_keys = ['direction-forward', 'direction-reverse', 'direction-total']
     direction_names = ['Forward', 'Reverse', 'Total']
 
-    def __init__(self, result):
+    def __init__(self, result, sender):
         Summarizer.__init__(self)
         self.result = result
         self.config = self.result['config']
+        self.record_header = None
+        self.record_data = None
+        self.sender = sender
+        # if sender is available initialize record
+        if self.sender:
+            self.__record_init()
         self.__summarize()
 
     def __summarize(self):
@@ -223,6 +231,9 @@ class NFVBenchSummarizer(Summarizer):
             'vSwitch': self.result['openstack_spec']['vswitch'],
             'Encapsulation': self.result['openstack_spec']['encaps']
         })
+        self.__record_header_put('version', self.result['nfvbench_version'])
+        self.__record_header_put('vSwitch', self.result['openstack_spec']['vswitch'])
+        self.__record_header_put('Encapsulation', self.result['openstack_spec']['encaps'])
         self._put('Benchmarks:')
         with self._create_block():
             self._put('Networks:')
@@ -251,7 +262,6 @@ class NFVBenchSummarizer(Summarizer):
                             self._put('NDR:', self.config['measurement']['NDR'])
                         if self.config['pdr_run']:
                             self._put('PDR:', self.config['measurement']['PDR'])
-
                 self._put('Service chain:')
                 for result in network_benchmark['service_chain'].iteritems():
                     with self._create_block():
@@ -261,6 +271,8 @@ class NFVBenchSummarizer(Summarizer):
         self._put(chain_name + ':')
         if chain_name == ChainType.PVVP:
             self._put('Mode:', chain_benchmark.get('mode'))
+            chain_name += "-" + chain_benchmark.get('mode')
+        self.__record_header_put('service_chain', chain_name)
         with self._create_block():
             self._put('Traffic:')
             with self._create_block(False):
@@ -272,6 +284,12 @@ class NFVBenchSummarizer(Summarizer):
         self._put('Flow count:', traffic_benchmark['flow_count'])
         self._put('Service chains count:', traffic_benchmark['service_chain_count'])
         self._put('Compute nodes:', traffic_benchmark['compute_nodes'].keys())
+
+        self.__record_header_put('profile', traffic_benchmark['profile'])
+        self.__record_header_put('bidirectional', traffic_benchmark['bidirectional'])
+        self.__record_header_put('flow_count', traffic_benchmark['flow_count'])
+        self.__record_header_put('sc_count', traffic_benchmark['service_chain_count'])
+        self.__record_header_put('compute_nodes', traffic_benchmark['compute_nodes'].keys())
         with self._create_block(False):
             self._put()
             if not self.config['no_traffic']:
@@ -289,6 +307,7 @@ class NFVBenchSummarizer(Summarizer):
                 if 'warning' in entry:
                     continue
                 self.__chain_analysis_summarize(*entry)
+                self.__record_send()
 
     def __chain_analysis_summarize(self, frame_size, analysis):
         self._put()
@@ -296,19 +315,25 @@ class NFVBenchSummarizer(Summarizer):
         if 'analysis_duration_sec' in analysis:
             self._put('Chain analysis duration:',
                       Formatter.float(3)(analysis['analysis_duration_sec']), 'seconds')
+            self.__record_data_put(frame_size, {'chain_analysis_duration': Formatter.float(3)(
+                analysis['analysis_duration_sec'])})
         if self.config['ndr_run']:
             self._put('NDR search duration:', Formatter.float(0)(analysis['ndr']['time_taken_sec']),
                       'seconds')
+            self.__record_data_put(frame_size, {'ndr_search_duration': Formatter.float(0)(
+                analysis['ndr']['time_taken_sec'])})
         if self.config['pdr_run']:
             self._put('PDR search duration:', Formatter.float(0)(analysis['pdr']['time_taken_sec']),
                       'seconds')
+            self.__record_data_put(frame_size, {'pdr_search_duration': Formatter.float(0)(
+                analysis['pdr']['time_taken_sec'])})
         self._put()
 
         if not self.config['no_traffic'] and self.config['single_run']:
             self._put('Run Config:')
             self._put()
             with self._create_block(False):
-                self._put_table(self.__get_config_table(analysis['run_config']))
+                self._put_table(self.__get_config_table(analysis['run_config'], frame_size))
                 if 'warning' in analysis['run_config'] and analysis['run_config']['warning']:
                     self._put()
                     self._put(analysis['run_config']['warning'])
@@ -335,12 +360,21 @@ class NFVBenchSummarizer(Summarizer):
                     'NDR',
                     frame_size,
                     analysis['ndr']['rate_bps'],
-                    int(analysis['ndr']['rate_pps']),
+                    analysis['ndr']['rate_pps'],
                     analysis['ndr']['stats']['overall']['drop_percentage'],
                     analysis['ndr']['stats']['overall']['avg_delay_usec'],
                     analysis['ndr']['stats']['overall']['min_delay_usec'],
                     analysis['ndr']['stats']['overall']['max_delay_usec']
                 ])
+                self.__record_data_put(frame_size, {'ndr': {
+                    'type': 'NDR',
+                    'rate_bps': analysis['ndr']['rate_bps'],
+                    'rate_pps': analysis['ndr']['rate_pps'],
+                    'drop_percantage': analysis['ndr']['stats']['overall']['drop_percentage'],
+                    'avg_delay_usec': analysis['ndr']['stats']['overall']['avg_delay_usec'],
+                    'min_delay_usec': analysis['ndr']['stats']['overall']['min_delay_usec'],
+                    'max_delay_usec': analysis['ndr']['stats']['overall']['max_delay_usec']
+                }})
         if self.config['pdr_run']:
             for frame_size, analysis in traffic_result.iteritems():
                 if frame_size == 'warning':
@@ -349,12 +383,21 @@ class NFVBenchSummarizer(Summarizer):
                     'PDR',
                     frame_size,
                     analysis['pdr']['rate_bps'],
-                    int(analysis['pdr']['rate_pps']),
+                    analysis['pdr']['rate_pps'],
                     analysis['pdr']['stats']['overall']['drop_percentage'],
                     analysis['pdr']['stats']['overall']['avg_delay_usec'],
                     analysis['pdr']['stats']['overall']['min_delay_usec'],
                     analysis['pdr']['stats']['overall']['max_delay_usec']
                 ])
+                self.__record_data_put(frame_size, {'pdr': {
+                    'type': 'PDR',
+                    'rate_bps': analysis['pdr']['rate_bps'],
+                    'rate_pps': analysis['pdr']['rate_pps'],
+                    'drop_percantage': analysis['pdr']['stats']['overall']['drop_percentage'],
+                    'avg_delay_usec': analysis['pdr']['stats']['overall']['avg_delay_usec'],
+                    'min_delay_usec': analysis['pdr']['stats']['overall']['min_delay_usec'],
+                    'max_delay_usec': analysis['pdr']['stats']['overall']['max_delay_usec']
+                }})
         if self.config['single_run']:
             for frame_size, analysis in traffic_result.iteritems():
                 summary_table.add_row([
@@ -364,9 +407,16 @@ class NFVBenchSummarizer(Summarizer):
                     analysis['stats']['overall']['rx']['min_delay_usec'],
                     analysis['stats']['overall']['rx']['max_delay_usec']
                 ])
+                self.__record_data_put(frame_size, {'single_run': {
+                    'type': 'single_run',
+                    'drop_rate_percent': analysis['stats']['overall']['drop_rate_percent'],
+                    'avg_delay_usec': analysis['stats']['overall']['rx']['avg_delay_usec'],
+                    'min_delay_usec': analysis['stats']['overall']['rx']['min_delay_usec'],
+                    'max_delay_usec': analysis['stats']['overall']['rx']['max_delay_usec']
+                }})
         return summary_table
 
-    def __get_config_table(self, run_config):
+    def __get_config_table(self, run_config, frame_size):
         config_table = Table(self.config_header)
         for key, name in zip(self.direction_keys, self.direction_names):
             if key not in run_config:
@@ -380,6 +430,15 @@ class NFVBenchSummarizer(Summarizer):
                 int(run_config[key]['tx']['rate_pps']),
                 int(run_config[key]['rx']['rate_pps']),
             ])
+            self.__record_data_put(frame_size, {
+                name.lower() + "_orig_rate_bps": int(run_config[key]['orig']['rate_bps']),
+                name.lower() + "_tx_rate_bps": int(run_config[key]['tx']['rate_bps']),
+                name.lower() + "_rx_rate_bps": int(run_config[key]['rx']['rate_bps']),
+                name.lower() + "_orig_rate_pps": int(run_config[key]['orig']['rate_pps']),
+                name.lower() + "_tx_rate_pps": int(run_config[key]['tx']['rate_pps']),
+                name.lower() + "_rx_rate_pps": int(run_config[key]['rx']['rate_pps']),
+
+            })
         return config_table
 
     def __get_chain_analysis_table(self, packet_analysis):
@@ -387,7 +446,6 @@ class NFVBenchSummarizer(Summarizer):
         forward_analysis = packet_analysis['direction-forward']
         reverse_analysis = packet_analysis['direction-reverse']
         reverse_analysis.reverse()
-
         for fwd, rev in zip(forward_analysis, reverse_analysis):
             chain_analysis_table.add_row([
                 fwd['interface'],
@@ -400,3 +458,47 @@ class NFVBenchSummarizer(Summarizer):
                 rev.get('packet_drop_percentage', None),
             ])
         return chain_analysis_table
+
+    def __record_header_put(self, key, value):
+        if self.sender:
+            self.record_header[key] = value
+
+    def __record_data_put(self, key, data):
+        if self.sender:
+            if key not in self.record_data:
+                self.record_data[key] = {}
+            self.record_data[key].update(data)
+
+    def __record_send(self):
+        if self.sender:
+            self.record_header["@timestamp"] = datetime.utcnow().replace(
+                tzinfo=pytz.utc).strftime(
+                "%Y-%m-%dT%H:%M:%S.%f%z")
+            for frame_size in self.record_data:
+                data = self.record_header
+                data['frame_size'] = frame_size
+                data.update(self.record_data[frame_size])
+                run_specific_data = {}
+                if 'single_run' in data:
+                    run_specific_data['single_run'] = data['single_run']
+                    del data['single_run']
+                if 'ndr' in data:
+                    run_specific_data['ndr'] = data['ndr']
+                    run_specific_data['ndr']['drop_limit'] = self.config['measurement']['NDR']
+                    del data['ndr']
+                if 'pdr' in data:
+                    run_specific_data['pdr'] = data['pdr']
+                    run_specific_data['pdr']['drop_limit'] = self.config['measurement']['PDR']
+                    del data['pdr']
+                for key in run_specific_data:
+                    data_to_send = data.copy()
+                    data_to_send.update(run_specific_data[key])
+                    self.sender.record_send(data_to_send)
+            self.__record_init()
+
+    def __record_init(self):
+        # init is called after checking for sender
+        self.record_header = {
+            "runlogdate": self.sender.runlogdate,
+        }
+        self.record_data = {}
index 7c8367a..27ff227 100644 (file)
@@ -537,9 +537,9 @@ class TrafficClient(object):
                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
                 except ValueError:
                     retDict[port]['rx'][key] = 0
-            retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec'])
-            retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec'])
-            retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec'])
+            retDict[port]['rx']['avg_delay_usec'] = int(stats[port]['rx']['avg_delay_usec'])
+            retDict[port]['rx']['min_delay_usec'] = int(stats[port]['rx']['min_delay_usec'])
+            retDict[port]['rx']['max_delay_usec'] = int(stats[port]['rx']['max_delay_usec'])
             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
 
         ports = sorted(retDict.keys())
index e5dc463..d3def42 100644 (file)
@@ -39,9 +39,9 @@ def convert_rates(l2frame_size, rate, intf_speed):
 
     return {
         'initial_rate_type': initial_rate_type,
-        'rate_pps': pps,
+        'rate_pps': int(pps),
         'rate_percent': load,
-        'rate_bps': bps
+        'rate_bps': int(bps)
     }
 
 
index 8aca290..a9effab 100644 (file)
@@ -27,7 +27,6 @@ from traffic_base import AbstractTrafficGenerator
 from traffic_base import TrafficGeneratorException
 import traffic_utils as utils
 
-
 from trex_stl_lib.api import CTRexVmInsFixHwCs
 from trex_stl_lib.api import Dot1Q
 from trex_stl_lib.api import Ether
@@ -49,7 +48,6 @@ from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
 
 
 class TRex(AbstractTrafficGenerator):
-
     LATENCY_PPS = 1000
 
     def __init__(self, runner):
@@ -75,27 +73,30 @@ class TRex(AbstractTrafficGenerator):
             stats = self.__combine_stats(in_stats, ph)
             result[ph] = {
                 'tx': {
-                    'total_pkts': stats['tx_pkts']['total'],
-                    'total_pkt_bytes': stats['tx_bytes']['total'],
-                    'pkt_rate': stats['tx_pps']['total'],
-                    'pkt_bit_rate': stats['tx_bps']['total']
+                    'total_pkts': int(stats['tx_pkts']['total']),
+                    'total_pkt_bytes': int(stats['tx_bytes']['total']),
+                    'pkt_rate': int(stats['tx_pps']['total']),
+                    'pkt_bit_rate': int(stats['tx_bps']['total'])
                 },
                 'rx': {
-                    'total_pkts': stats['rx_pkts']['total'],
-                    'total_pkt_bytes': stats['rx_bytes']['total'],
-                    'pkt_rate': stats['rx_pps']['total'],
-                    'pkt_bit_rate': stats['rx_bps']['total'],
-                    'dropped_pkts': stats['tx_pkts']['total'] - stats['rx_pkts']['total']
+                    'total_pkts': int(stats['rx_pkts']['total']),
+                    'total_pkt_bytes': int(stats['rx_bytes']['total']),
+                    'pkt_rate': int(stats['rx_pps']['total']),
+                    'pkt_bit_rate': int(stats['rx_bps']['total']),
+                    'dropped_pkts': int(stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
                 }
             }
 
             lat = self.__combine_latencies(in_stats, ph)
-            result[ph]['rx']['max_delay_usec'] = lat.get('total_max', float('nan'))
-            result[ph]['rx']['min_delay_usec'] = lat.get('total_min', float('nan'))
-            result[ph]['rx']['avg_delay_usec'] = lat.get('average', float('nan'))
-
+            result[ph]['rx']['max_delay_usec'] = int(
+                lat['total_max']) if 'total_max' in lat else float('nan')
+            result[ph]['rx']['min_delay_usec'] = int(
+                lat['total_min']) if 'total_min' in lat else float('nan')
+            result[ph]['rx']['avg_delay_usec'] = int(
+                lat['average']) if 'average' in lat else float(
+                'nan')
         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
-        result["total_tx_rate"] = total_tx_pkts / self.config.duration_sec
+        result["total_tx_rate"] = int(total_tx_pkts / self.config.duration_sec)
         return result
 
     def __combine_stats(self, in_stats, port_handle):