NFVBENCH-153 Add support for python3
[nfvbench.git] / nfvbench / nfvbenchd.py
index aef896a..73e1342 100644 (file)
 #    under the License.
 #
 
+import json
+import queue
+from threading import Thread
+import uuid
+
 from flask import Flask
 from flask import jsonify
-from flask import render_template
 from flask import request
 
-from flask_socketio import emit
-from flask_socketio import SocketIO
+from .summarizer import NFVBenchSummarizer
 
-import json
-import Queue
-import traceback
-from utils import byteify
-from utils import RunLock
-import uuid
+from .log import LOG
+from .utils import byteify
+from .utils import RunLock
 
-# this global cannot reside in Ctx because of the @app and @socketio decorators
-app = None
-socketio = None
+from .__init__ import __version__
 
 STATUS_OK = 'OK'
 STATUS_ERROR = 'ERROR'
 STATUS_PENDING = 'PENDING'
 STATUS_NOT_FOUND = 'NOT_FOUND'
 
-
 def result_json(status, message, request_id=None):
     body = {
         'status': status,
         'error_message': message
     }
-
     if request_id is not None:
         body['request_id'] = request_id
 
@@ -61,18 +57,16 @@ def get_uuid():
 
 class Ctx(object):
     MAXLEN = 5
-    run_queue = Queue.Queue()
+    run_queue = queue.Queue()
     busy = False
     result = None
-    request_from_socketio = False
     results = {}
     ids = []
     current_id = None
 
     @staticmethod
-    def enqueue(config, request_id, from_socketio=False):
+    def enqueue(config, request_id):
         Ctx.busy = True
-        Ctx.request_from_socketio = from_socketio
         config['request_id'] = request_id
         Ctx.run_queue.put(config)
 
@@ -107,16 +101,15 @@ class Ctx(object):
                 res = Ctx.results[request_id]
             except KeyError:
                 return None
-
+            # pylint: disable=unsubscriptable-object
             if Ctx.result and request_id == Ctx.result['request_id']:
                 Ctx.result = None
-
-            return res
-        else:
-            res = Ctx.result
-            if res:
-                Ctx.result = None
             return res
+            # pylint: enable=unsubscriptable-object
+        res = Ctx.result
+        if res:
+            Ctx.result = None
+        return res
 
     @staticmethod
     def is_busy():
@@ -127,54 +120,33 @@ class Ctx(object):
         return Ctx.current_id
 
 
-def setup_flask(root_path):
-    global socketio
-    global app
+def setup_flask():
     app = Flask(__name__)
-    app.root_path = root_path
-    socketio = SocketIO(app, async_mode='threading')
     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
     not_found_msg = 'results not found'
     pending_msg = 'NFVbench run still pending'
 
-    # --------- socketio requests ------------
-
-    @socketio.on('start_run')
-    def socketio_start_run(config):
-        if not Ctx.is_busy():
-            Ctx.enqueue(config, get_uuid(), from_socketio=True)
-        else:
-            emit('error', {'reason': 'there is already an NFVbench request running'})
-
-    @socketio.on('echo')
-    def socketio_echo(config):
-        emit('echo', config)
-
     # --------- HTTP requests ------------
 
-    @app.route('/')
-    def index():
-        return render_template('index.html')
-
-    @app.route('/echo', methods=['GET'])
-    def echo():
-        config = request.json
-        return jsonify(config)
+    @app.route('/version', methods=['GET'])
+    def _version():
+        return __version__
 
     @app.route('/start_run', methods=['POST'])
-    def start_run():
+    def _start_run():
         config = load_json(request.json)
+        if not config:
+            config = {}
         if Ctx.is_busy():
             return jsonify(busy_json)
-        else:
-            request_id = get_uuid()
-            Ctx.enqueue(config, request_id)
-            return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+        request_id = get_uuid()
+        Ctx.enqueue(config, request_id)
+        return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
 
     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
     @app.route('/status/<request_id>', methods=['GET'])
-    def get_status(request_id):
+    def _get_status(request_id):
         if request_id:
             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
                 # task with request_id still pending
@@ -184,68 +156,67 @@ def setup_flask(root_path):
             if res:
                 # found result for given request_id
                 return jsonify(res)
-            else:
-                # result for given request_id not found
-                return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
-        else:
-            if Ctx.is_busy():
-                # task still pending, return with request_id
-                return jsonify(result_json(STATUS_PENDING,
-                                           pending_msg,
-                                           Ctx.get_current_request_id()))
-
-            res = Ctx.get_result()
-            if res:
-                return jsonify(res)
-            else:
-                return jsonify(not_busy_json)
+            # result for given request_id not found
+            return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
+        if Ctx.is_busy():
+            # task still pending, return with request_id
+            return jsonify(result_json(STATUS_PENDING,
+                                       pending_msg,
+                                       Ctx.get_current_request_id()))
+
+        res = Ctx.get_result()
+        if res:
+            return jsonify(res)
+        return jsonify(not_busy_json)
 
+    return app
 
-class WebSocketIoServer(object):
-    """This class takes care of the web socketio server, accepts websocket events, and sends back
-    notifications using websocket events (send_ methods). Caller should simply create an instance
+class WebServer(object):
+    """This class takes care of the web server. Caller should simply create an instance
     of this class and pass a runner object then invoke the run method
     """
-    def __init__(self, http_root, runner):
+
+    def __init__(self, runner, fluent_logger):
         self.nfvbench_runner = runner
-        setup_flask(http_root)
+        self.app = setup_flask()
+        self.fluent_logger = fluent_logger
 
-    def run(self, host='127.0.0.1', port=7556):
+    def run(self, host, port):
 
-        # socketio.run will not return so we need to run it in a background thread so that
+        # app.run will not return so we need to run it in a background thread so that
         # the calling thread (main thread) can keep doing work
-        socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
+        Thread(target=self.app.run, args=(host, port)).start()
 
         # wait for run requests
         # the runner must be executed from the main thread (Trex client library requirement)
         while True:
+
             # print 'main thread waiting for requests...'
             config = Ctx.dequeue()
             # print 'main thread processing request...'
-            print config
+            print config
             try:
                 # remove unfilled values as we do not want them to override default values with None
-                config = {k: v for k, v in config.items() if v is not None}
+                config = {k: v for k, v in list(config.items()) if v is not None}
                 with RunLock():
-                    results = self.nfvbench_runner.run(config)
+                    if self.fluent_logger:
+                        self.fluent_logger.start_new_run()
+                    results = self.nfvbench_runner.run(config, config)
             except Exception as exc:
-                print 'NFVbench runner exception:'
-                traceback.print_exc()
                 results = result_json(STATUS_ERROR, str(exc))
+                LOG.exception('NFVbench runner exception:')
 
-            if Ctx.request_from_socketio:
-                socketio.emit('run_end', results)
-            else:
-                # this might overwrite a previously unfetched result
-                Ctx.set_result(results)
+            # this might overwrite a previously unfetched result
+            Ctx.set_result(results)
+            try:
+                summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
+                LOG.info(str(summary))
+            except KeyError:
+                # in case of error, 'result' might be missing
+                if 'error_message' in results:
+                    LOG.error(results['error_message'])
+                else:
+                    LOG.error('REST request completed without results or error message')
             Ctx.release()
-
-    def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
-        stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
-        socketio.emit('run_interval_stats', stats)
-
-    def send_ndr_found(self, ndr_pps):
-        socketio.emit('ndr_found', {'rate_pps': ndr_pps})
-
-    def send_pdr_found(self, pdr_pps):
-        socketio.emit('pdr_found', {'rate_pps': pdr_pps})
+            if self.fluent_logger:
+                self.fluent_logger.send_run_summary(True)