Initial code drop from Cisco
[nfvbench.git] / nfvbench / nfvbenchd.py
diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py
new file mode 100644 (file)
index 0000000..aef896a
--- /dev/null
@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+# Copyright 2017 Cisco Systems, Inc.  All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+from flask import Flask
+from flask import jsonify
+from flask import render_template
+from flask import request
+
+from flask_socketio import emit
+from flask_socketio import SocketIO
+
+import json
+import Queue
+import traceback
+from utils import byteify
+from utils import RunLock
+import uuid
+
+# this global cannot reside in Ctx because of the @app and @socketio decorators
+app = None
+socketio = None
+
+STATUS_OK = 'OK'
+STATUS_ERROR = 'ERROR'
+STATUS_PENDING = 'PENDING'
+STATUS_NOT_FOUND = 'NOT_FOUND'
+
+
+def result_json(status, message, request_id=None):
+    body = {
+        'status': status,
+        'error_message': message
+    }
+
+    if request_id is not None:
+        body['request_id'] = request_id
+
+    return body
+
+
+def load_json(data):
+    return json.loads(json.dumps(data), object_hook=byteify)
+
+
+def get_uuid():
+    return uuid.uuid4().hex
+
+
+class Ctx(object):
+    MAXLEN = 5
+    run_queue = Queue.Queue()
+    busy = False
+    result = None
+    request_from_socketio = False
+    results = {}
+    ids = []
+    current_id = None
+
+    @staticmethod
+    def enqueue(config, request_id, from_socketio=False):
+        Ctx.busy = True
+        Ctx.request_from_socketio = from_socketio
+        config['request_id'] = request_id
+        Ctx.run_queue.put(config)
+
+        if len(Ctx.ids) >= Ctx.MAXLEN:
+            try:
+                del Ctx.results[Ctx.ids.pop(0)]
+            except KeyError:
+                pass
+        Ctx.ids.append(request_id)
+
+    @staticmethod
+    def dequeue():
+        config = Ctx.run_queue.get()
+        Ctx.current_id = config['request_id']
+        return config
+
+    @staticmethod
+    def release():
+        Ctx.current_id = None
+        Ctx.busy = False
+
+    @staticmethod
+    def set_result(res):
+        res['request_id'] = Ctx.current_id
+        Ctx.results[Ctx.current_id] = res
+        Ctx.result = res
+
+    @staticmethod
+    def get_result(request_id=None):
+        if request_id:
+            try:
+                res = Ctx.results[request_id]
+            except KeyError:
+                return None
+
+            if Ctx.result and request_id == Ctx.result['request_id']:
+                Ctx.result = None
+
+            return res
+        else:
+            res = Ctx.result
+            if res:
+                Ctx.result = None
+            return res
+
+    @staticmethod
+    def is_busy():
+        return Ctx.busy
+
+    @staticmethod
+    def get_current_request_id():
+        return Ctx.current_id
+
+
+def setup_flask(root_path):
+    global socketio
+    global app
+    app = Flask(__name__)
+    app.root_path = root_path
+    socketio = SocketIO(app, async_mode='threading')
+    busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
+    not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
+    not_found_msg = 'results not found'
+    pending_msg = 'NFVbench run still pending'
+
+    # --------- socketio requests ------------
+
+    @socketio.on('start_run')
+    def socketio_start_run(config):
+        if not Ctx.is_busy():
+            Ctx.enqueue(config, get_uuid(), from_socketio=True)
+        else:
+            emit('error', {'reason': 'there is already an NFVbench request running'})
+
+    @socketio.on('echo')
+    def socketio_echo(config):
+        emit('echo', config)
+
+    # --------- HTTP requests ------------
+
+    @app.route('/')
+    def index():
+        return render_template('index.html')
+
+    @app.route('/echo', methods=['GET'])
+    def echo():
+        config = request.json
+        return jsonify(config)
+
+    @app.route('/start_run', methods=['POST'])
+    def start_run():
+        config = load_json(request.json)
+        if Ctx.is_busy():
+            return jsonify(busy_json)
+        else:
+            request_id = get_uuid()
+            Ctx.enqueue(config, request_id)
+            return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+
+    @app.route('/status', defaults={'request_id': None}, methods=['GET'])
+    @app.route('/status/<request_id>', methods=['GET'])
+    def get_status(request_id):
+        if request_id:
+            if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
+                # task with request_id still pending
+                return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+
+            res = Ctx.get_result(request_id)
+            if res:
+                # found result for given request_id
+                return jsonify(res)
+            else:
+                # result for given request_id not found
+                return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
+        else:
+            if Ctx.is_busy():
+                # task still pending, return with request_id
+                return jsonify(result_json(STATUS_PENDING,
+                                           pending_msg,
+                                           Ctx.get_current_request_id()))
+
+            res = Ctx.get_result()
+            if res:
+                return jsonify(res)
+            else:
+                return jsonify(not_busy_json)
+
+
+class WebSocketIoServer(object):
+    """This class takes care of the web socketio server, accepts websocket events, and sends back
+    notifications using websocket events (send_ methods). Caller should simply create an instance
+    of this class and pass a runner object then invoke the run method
+    """
+    def __init__(self, http_root, runner):
+        self.nfvbench_runner = runner
+        setup_flask(http_root)
+
+    def run(self, host='127.0.0.1', port=7556):
+
+        # socketio.run will not return so we need to run it in a background thread so that
+        # the calling thread (main thread) can keep doing work
+        socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
+
+        # wait for run requests
+        # the runner must be executed from the main thread (Trex client library requirement)
+        while True:
+            # print 'main thread waiting for requests...'
+            config = Ctx.dequeue()
+            # print 'main thread processing request...'
+            print config
+            try:
+                # remove unfilled values as we do not want them to override default values with None
+                config = {k: v for k, v in config.items() if v is not None}
+                with RunLock():
+                    results = self.nfvbench_runner.run(config)
+            except Exception as exc:
+                print 'NFVbench runner exception:'
+                traceback.print_exc()
+                results = result_json(STATUS_ERROR, str(exc))
+
+            if Ctx.request_from_socketio:
+                socketio.emit('run_end', results)
+            else:
+                # this might overwrite a previously unfetched result
+                Ctx.set_result(results)
+            Ctx.release()
+
+    def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
+        stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
+        socketio.emit('run_interval_stats', stats)
+
+    def send_ndr_found(self, ndr_pps):
+        socketio.emit('ndr_found', {'rate_pps': ndr_pps})
+
+    def send_pdr_found(self, pdr_pps):
+        socketio.emit('pdr_found', {'rate_pps': pdr_pps})