X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Fnfvbenchd.py;h=73e1342dd8fe404b58bf424444737ec7561de870;hb=95f2491ed89ac99b0d8bd006b4a13cbeb1eb96ce;hp=fa781afecbcc8ae34e5ff6b1ab97ee8c78d9337e;hpb=59db88a0554459514e40fd512e3d105820a9086f;p=nfvbench.git diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index fa781af..73e1342 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -15,38 +15,32 @@ # import json -import Queue +import queue +from threading import Thread import uuid from flask import Flask from flask import jsonify -from flask import render_template from flask import request -from flask_socketio import emit -from flask_socketio import SocketIO -from summarizer import NFVBenchSummarizer +from .summarizer import NFVBenchSummarizer -from log import LOG -from utils import byteify -from utils import RunLock +from .log import LOG +from .utils import byteify +from .utils import RunLock -# this global cannot reside in Ctx because of the @app and @socketio decorators -app = None -socketio = None +from .__init__ import __version__ STATUS_OK = 'OK' STATUS_ERROR = 'ERROR' STATUS_PENDING = 'PENDING' STATUS_NOT_FOUND = 'NOT_FOUND' - def result_json(status, message, request_id=None): body = { 'status': status, 'error_message': message } - if request_id is not None: body['request_id'] = request_id @@ -63,18 +57,16 @@ def get_uuid(): class Ctx(object): MAXLEN = 5 - run_queue = Queue.Queue() + run_queue = queue.Queue() busy = False result = None - request_from_socketio = False results = {} ids = [] current_id = None @staticmethod - def enqueue(config, request_id, from_socketio=False): + def enqueue(config, request_id): Ctx.busy = True - Ctx.request_from_socketio = from_socketio config['request_id'] = request_id Ctx.run_queue.put(config) @@ -109,16 +101,15 @@ class Ctx(object): res = Ctx.results[request_id] except KeyError: return None - + # pylint: disable=unsubscriptable-object if Ctx.result and request_id == Ctx.result['request_id']: Ctx.result = None - - return res - else: - res = Ctx.result - if res: - Ctx.result = None return res + # pylint: enable=unsubscriptable-object + res = Ctx.result + if res: + Ctx.result = None + return res @staticmethod def is_busy(): @@ -129,40 +120,18 @@ class Ctx(object): return Ctx.current_id -def setup_flask(root_path): - global socketio - global app +def setup_flask(): app = Flask(__name__) - app.root_path = root_path - socketio = SocketIO(app, async_mode='threading') busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running') not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run') not_found_msg = 'results not found' pending_msg = 'NFVbench run still pending' - # --------- socketio requests ------------ - - @socketio.on('start_run') - def _socketio_start_run(config): - if not Ctx.is_busy(): - Ctx.enqueue(config, get_uuid(), from_socketio=True) - else: - emit('error', {'reason': 'there is already an NFVbench request running'}) - - @socketio.on('echo') - def _socketio_echo(config): - emit('echo', config) - # --------- HTTP requests ------------ - @app.route('/') - def _index(): - return render_template('index.html') - - @app.route('/echo', methods=['GET']) - def _echo(): - config = request.json - return jsonify(config) + @app.route('/version', methods=['GET']) + def _version(): + return __version__ @app.route('/start_run', methods=['POST']) def _start_run(): @@ -189,35 +158,34 @@ def setup_flask(root_path): return jsonify(res) # result for given request_id not found return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) - else: - if Ctx.is_busy(): - # task still pending, return with request_id - return jsonify(result_json(STATUS_PENDING, - pending_msg, - Ctx.get_current_request_id())) - - res = Ctx.get_result() - if res: - return jsonify(res) - return jsonify(not_busy_json) + if Ctx.is_busy(): + # task still pending, return with request_id + return jsonify(result_json(STATUS_PENDING, + pending_msg, + Ctx.get_current_request_id())) + res = Ctx.get_result() + if res: + return jsonify(res) + return jsonify(not_busy_json) -class WebSocketIoServer(object): - """This class takes care of the web socketio server, accepts websocket events, and sends back - notifications using websocket events (send_ methods). Caller should simply create an instance + return app + +class WebServer(object): + """This class takes care of the web server. Caller should simply create an instance of this class and pass a runner object then invoke the run method """ - def __init__(self, http_root, runner, fluent_logger): + def __init__(self, runner, fluent_logger): self.nfvbench_runner = runner - setup_flask(http_root) + self.app = setup_flask() self.fluent_logger = fluent_logger - def run(self, host='127.0.0.1', port=7556): + def run(self, host, port): - # socketio.run will not return so we need to run it in a background thread so that + # app.run will not return so we need to run it in a background thread so that # the calling thread (main thread) can keep doing work - socketio.start_background_task(target=socketio.run, app=app, host=host, port=port) + Thread(target=self.app.run, args=(host, port)).start() # wait for run requests # the runner must be executed from the main thread (Trex client library requirement) @@ -229,7 +197,7 @@ class WebSocketIoServer(object): # print config try: # remove unfilled values as we do not want them to override default values with None - config = {k: v for k, v in config.items() if v is not None} + config = {k: v for k, v in list(config.items()) if v is not None} with RunLock(): if self.fluent_logger: self.fluent_logger.start_new_run() @@ -238,11 +206,8 @@ class WebSocketIoServer(object): results = result_json(STATUS_ERROR, str(exc)) LOG.exception('NFVbench runner exception:') - if Ctx.request_from_socketio: - socketio.emit('run_end', results) - else: - # this might overwrite a previously unfetched result - Ctx.set_result(results) + # this might overwrite a previously unfetched result + Ctx.set_result(results) try: summary = NFVBenchSummarizer(results['result'], self.fluent_logger) LOG.info(str(summary)) @@ -255,13 +220,3 @@ class WebSocketIoServer(object): Ctx.release() if self.fluent_logger: self.fluent_logger.send_run_summary(True) - - def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): - stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} - socketio.emit('run_interval_stats', stats) - - def send_ndr_found(self, ndr_pps): - socketio.emit('ndr_found', {'rate_pps': ndr_pps}) - - def send_pdr_found(self, pdr_pps): - socketio.emit('pdr_found', {'rate_pps': pdr_pps})