NFVBENCH-41 Add fluentd result tag to nfvbench server
[nfvbench.git] / nfvbench / nfvbenchd.py
index aef896a..1797496 100644 (file)
 #    under the License.
 #
 
+import json
+import Queue
+import traceback
+import uuid
+
 from flask import Flask
 from flask import jsonify
 from flask import render_template
@@ -21,13 +26,12 @@ from flask import request
 
 from flask_socketio import emit
 from flask_socketio import SocketIO
+from fluentd import FluentLogHandler
+from summarizer import NFVBenchSummarizer
 
-import json
-import Queue
-import traceback
+from log import LOG
 from utils import byteify
 from utils import RunLock
-import uuid
 
 # this global cannot reside in Ctx because of the @app and @socketio decorators
 app = None
@@ -141,40 +145,41 @@ def setup_flask(root_path):
     # --------- socketio requests ------------
 
     @socketio.on('start_run')
-    def socketio_start_run(config):
+    def _socketio_start_run(config):
         if not Ctx.is_busy():
             Ctx.enqueue(config, get_uuid(), from_socketio=True)
         else:
             emit('error', {'reason': 'there is already an NFVbench request running'})
 
     @socketio.on('echo')
-    def socketio_echo(config):
+    def _socketio_echo(config):
         emit('echo', config)
 
     # --------- HTTP requests ------------
 
     @app.route('/')
-    def index():
+    def _index():
         return render_template('index.html')
 
     @app.route('/echo', methods=['GET'])
-    def echo():
+    def _echo():
         config = request.json
         return jsonify(config)
 
     @app.route('/start_run', methods=['POST'])
-    def start_run():
+    def _start_run():
         config = load_json(request.json)
+        if not config:
+            config = {}
         if Ctx.is_busy():
             return jsonify(busy_json)
-        else:
-            request_id = get_uuid()
-            Ctx.enqueue(config, request_id)
-            return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+        request_id = get_uuid()
+        Ctx.enqueue(config, request_id)
+        return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
 
     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
     @app.route('/status/<request_id>', methods=['GET'])
-    def get_status(request_id):
+    def _get_status(request_id):
         if request_id:
             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
                 # task with request_id still pending
@@ -184,9 +189,8 @@ def setup_flask(root_path):
             if res:
                 # found result for given request_id
                 return jsonify(res)
-            else:
-                # result for given request_id not found
-                return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
+            # result for given request_id not found
+            return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
         else:
             if Ctx.is_busy():
                 # task still pending, return with request_id
@@ -197,8 +201,7 @@ def setup_flask(root_path):
             res = Ctx.get_result()
             if res:
                 return jsonify(res)
-            else:
-                return jsonify(not_busy_json)
+            return jsonify(not_busy_json)
 
 
 class WebSocketIoServer(object):
@@ -206,9 +209,18 @@ class WebSocketIoServer(object):
     notifications using websocket events (send_ methods). Caller should simply create an instance
     of this class and pass a runner object then invoke the run method
     """
-    def __init__(self, http_root, runner):
+
+    def __init__(self, http_root, runner, logger, result_tag):
         self.nfvbench_runner = runner
         setup_flask(http_root)
+        self.fluent_logger = logger
+        self.result_fluent_logger = None
+        if result_tag:
+            self.result_fluent_logger = \
+                FluentLogHandler(result_tag,
+                                 fluentd_ip=self.fluent_logger.sender.host,
+                                 fluentd_port=self.fluent_logger.sender.port)
+            self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
 
     def run(self, host='127.0.0.1', port=7556):
 
@@ -219,6 +231,7 @@ class WebSocketIoServer(object):
         # wait for run requests
         # the runner must be executed from the main thread (Trex client library requirement)
         while True:
+
             # print 'main thread waiting for requests...'
             config = Ctx.dequeue()
             # print 'main thread processing request...'
@@ -227,18 +240,25 @@ class WebSocketIoServer(object):
                 # remove unfilled values as we do not want them to override default values with None
                 config = {k: v for k, v in config.items() if v is not None}
                 with RunLock():
-                    results = self.nfvbench_runner.run(config)
+                    results = self.nfvbench_runner.run(config, config)
             except Exception as exc:
                 print 'NFVbench runner exception:'
                 traceback.print_exc()
                 results = result_json(STATUS_ERROR, str(exc))
+                LOG.exception()
 
             if Ctx.request_from_socketio:
                 socketio.emit('run_end', results)
             else:
                 # this might overwrite a previously unfetched result
                 Ctx.set_result(results)
+            if self.fluent_logger:
+                self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
+            summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
+            LOG.info(str(summary))
             Ctx.release()
+            if self.fluent_logger:
+                self.fluent_logger.send_run_summary(True)
 
     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}