NFVBENCH-12 Add run summary to fluentd stream 83/40783/3
authorKerim Gokarslan <kgokarsl@cisco.com>
Thu, 31 Aug 2017 19:39:07 +0000 (12:39 -0700)
committerKerim Gokarslan <kgokarsl@cisco.com>
Thu, 31 Aug 2017 20:50:57 +0000 (13:50 -0700)
NFVBENCH-14 Fluentd server IP adress from the config is not picked up when configured
NFVBENCH-16 At the start of each run send a new fluent record with the config

Change-Id: I2d76ecee5b1b93dad4eeccd67d5ed906a0a8faba
Signed-off-by: Kerim Gokarslan <kgokarsl@cisco.com>
docs/testing/user/userguide/server.rst
nfvbench/fluentd.py
nfvbench/nfvbench.py
nfvbench/nfvbenchd.py

index 4495e19..ebdd828 100644 (file)
@@ -42,12 +42,15 @@ HTTP Interface
 <http-url>/echo (GET)
 ^^^^^^^^^^^^^^^^^^^^^
 
-This request simply returns whatever content is sent in the body of the request (only used for testing)
+This request simply returns whatever content is sent in the body of the request (body should be in json format, only used for testing)
 
-<http-url>/start_run (POST)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Example request: curl -XGET '127.0.0.1:7556/echo' -H "Content-Type: application/json" -d '{"vmtp": "test"}'
+Response:
+{
+  "vmtp": "test"
+}
 
-This request will initiate a new NFVbench run asynchornously and can optionally pass the NFVbench configuration to run in the body (in JSON format).
+ls
 See "NFVbench configuration JSON parameter" below for details on how to format this parameter.
 
 The request returns immediately with a json content indicating if there was an error (status=ERROR) or if the request was submitted successfully (status=PENDING). Example of return when the submission is successful:
@@ -147,7 +150,7 @@ The entire default configuration can be viewed using the --show-json-config opti
 
 .. code-block:: bash
 
-    # nfvbench --show-json-config
+    # nfvbench --show-config
     {
         "availability_zone": null,
         "compute_node_user": "root",
index 683d4ce..5bfcebe 100644 (file)
@@ -16,6 +16,7 @@ from datetime import datetime
 from fluent import sender
 import logging
 
+
 class FluentLogHandler(logging.Handler):
     '''This is a minimalist log handler for use with Fluentd
 
@@ -26,17 +27,25 @@ class FluentLogHandler(logging.Handler):
     - the runlogdate (to tie multiple run-related logs together)
     The timestamp is retrieved by the fluentd library.
     '''
+
     def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224):
         logging.Handler.__init__(self)
         self.tag = tag
         self.formatter = logging.Formatter('%(message)s')
-        self.sender = sender.FluentSender(self.tag, port=fluentd_port)
-        self.start_new_run()
+        self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port)
+        self.runlogdate = 0
+        self.__warning_counter = 0
+        self.__error_counter = 0
 
     def start_new_run(self):
         '''Delimitate a new run in the stream of records with a new timestamp
         '''
         self.runlogdate = str(datetime.now())
+        # reset counters
+        self.__warning_counter = 0
+        self.__error_counter = 0
+        # send start record
+        self.__send_start_record()
 
     def emit(self, record):
         data = {
@@ -44,4 +53,52 @@ class FluentLogHandler(logging.Handler):
             "loglevel": record.levelname,
             "message": self.formatter.format(record)
         }
+        self.__update_stats(record.levelno)
         self.sender.emit(None, data)
+
+    # send START record for each run
+    def __send_start_record(self):
+        data = {
+            "runlogdate": self.runlogdate,
+            "loglevel": "START",
+            "message": "NFVBENCH run is started",
+            "numloglevel": 0,
+            "numerrors": 0,
+            "numwarnings": 0
+        }
+        self.sender.emit(None, data)
+
+    # send stats related to the current run and reset state for a new run
+    def send_run_summary(self, run_summary_required):
+        if run_summary_required or self.__get_highest_level() == logging.ERROR:
+            data = {
+                "runlogdate": self.runlogdate,
+                "loglevel": "RUN_SUMMARY",
+                "message": self.__get_highest_level_desc(),
+                "numloglevel": self.__get_highest_level(),
+                "numerrors": self.__error_counter,
+                "numwarnings": self.__warning_counter
+            }
+            self.sender.emit(None, data)
+
+    def __get_highest_level(self):
+        if self.__error_counter > 0:
+            return logging.ERROR
+        elif self.__warning_counter > 0:
+            return logging.WARNING
+        return logging.INFO
+
+    def __get_highest_level_desc(self):
+        highest_level = self.__get_highest_level()
+        if highest_level == logging.INFO:
+            return "GOOD RUN"
+        elif highest_level == logging.WARNING:
+            return "RUN WITH WARNINGS"
+        else:
+            return "RUN WITH ERRORS"
+
+    def __update_stats(self, levelno):
+        if levelno == logging.WARNING:
+            self.__warning_counter += 1
+        elif levelno == logging.ERROR:
+            self.__error_counter += 1
index b36d328..37645aa 100644 (file)
@@ -44,6 +44,7 @@ import utils
 
 fluent_logger = None
 
+
 class NFVBench(object):
     """Main class of NFV benchmarking tool."""
     STATUS_OK = 'OK'
@@ -75,7 +76,7 @@ class NFVBench(object):
     def set_notifier(self, notifier):
         self.notifier = notifier
 
-    def run(self, opts):
+    def run(self, opts, args):
         status = NFVBench.STATUS_OK
         result = None
         message = ''
@@ -83,6 +84,7 @@ class NFVBench(object):
             # take a snapshot of the current time for this new run
             # so that all subsequent logs can relate to this run
             fluent_logger.start_new_run()
+        LOG.info(args)
         try:
             self.update_config(opts)
             self.setup()
@@ -407,6 +409,7 @@ def override_custom_traffic(config, frame_sizes, unidir):
         "profile": traffic_profile_name
     }
 
+
 def check_physnet(name, netattrs):
     if not netattrs.physical_network:
         raise Exception("SRIOV requires physical_network to be specified for the {n} network"
@@ -415,8 +418,10 @@ def check_physnet(name, netattrs):
         raise Exception("SRIOV requires segmentation_id to be specified for the {n} network"
                         .format(n=name))
 
+
 def main():
     global fluent_logger
+    run_summary_required = False
     try:
         log.setup()
         # load default config file
@@ -508,7 +513,7 @@ def main():
 
         if opts.server:
             if os.path.isdir(opts.server):
-                server = WebSocketIoServer(opts.server, nfvbench)
+                server = WebSocketIoServer(opts.server, nfvbench, fluent_logger)
                 nfvbench.set_notifier(server)
                 try:
                     port = int(opts.port)
@@ -521,6 +526,7 @@ def main():
                 sys.exit(1)
         else:
             with utils.RunLock():
+                run_summary_required = True
                 if unknown_opts:
                     err_msg = 'Unknown options: ' + ' '.join(unknown_opts)
                     LOG.error(err_msg)
@@ -528,7 +534,9 @@ def main():
 
                 # remove unfilled values
                 opts = {k: v for k, v in vars(opts).iteritems() if v is not None}
-                result = nfvbench.run(opts)
+                # get CLI args
+                params = ' '.join(str(e) for e in sys.argv[1:])
+                result = nfvbench.run(opts, params)
                 if 'error_message' in result:
                     raise Exception(result['error_message'])
 
@@ -536,12 +544,18 @@ def main():
                     nfvbench.save(result['result'])
                     nfvbench.print_summary(result['result'])
     except Exception as exc:
+        run_summary_required = True
         LOG.error({
             'status': NFVBench.STATUS_ERROR,
             'error_message': traceback.format_exc()
         })
         print str(exc)
-        sys.exit(1)
+    finally:
+        if fluent_logger:
+            # only send a summary record if there was an actual nfvbench run or
+            # if an error/exception was logged.
+            fluent_logger.send_run_summary(run_summary_required)
+
 
 if __name__ == '__main__':
     main()
index aef896a..4bbd69d 100644 (file)
@@ -23,6 +23,7 @@ from flask_socketio import emit
 from flask_socketio import SocketIO
 
 import json
+from log import LOG
 import Queue
 import traceback
 from utils import byteify
@@ -206,9 +207,10 @@ class WebSocketIoServer(object):
     notifications using websocket events (send_ methods). Caller should simply create an instance
     of this class and pass a runner object then invoke the run method
     """
-    def __init__(self, http_root, runner):
+    def __init__(self, http_root, runner, logger):
         self.nfvbench_runner = runner
         setup_flask(http_root)
+        self.fluent_logger = logger
 
     def run(self, host='127.0.0.1', port=7556):
 
@@ -219,6 +221,7 @@ class WebSocketIoServer(object):
         # wait for run requests
         # the runner must be executed from the main thread (Trex client library requirement)
         while True:
+
             # print 'main thread waiting for requests...'
             config = Ctx.dequeue()
             # print 'main thread processing request...'
@@ -227,11 +230,12 @@ class WebSocketIoServer(object):
                 # remove unfilled values as we do not want them to override default values with None
                 config = {k: v for k, v in config.items() if v is not None}
                 with RunLock():
-                    results = self.nfvbench_runner.run(config)
+                    results = self.nfvbench_runner.run(config, config)
             except Exception as exc:
                 print 'NFVbench runner exception:'
                 traceback.print_exc()
                 results = result_json(STATUS_ERROR, str(exc))
+                LOG.exception()
 
             if Ctx.request_from_socketio:
                 socketio.emit('run_end', results)
@@ -239,6 +243,7 @@ class WebSocketIoServer(object):
                 # this might overwrite a previously unfetched result
                 Ctx.set_result(results)
             Ctx.release()
+            self.fluent_logger.send_run_summary(True)
 
     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}