X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Fnfvbenchd.py;h=73e1342dd8fe404b58bf424444737ec7561de870;hb=95f2491ed89ac99b0d8bd006b4a13cbeb1eb96ce;hp=1e096aefa94923e402ec79a65e11b86d72a9d651;hpb=34cbe7031415297ee5c2b6c7059801603398fa7f;p=nfvbench.git diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index 1e096ae..73e1342 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -14,40 +14,33 @@ # under the License. # +import json +import queue +from threading import Thread +import uuid + from flask import Flask from flask import jsonify -from flask import render_template from flask import request -from flask_socketio import emit -from flask_socketio import SocketIO -from fluentd import FluentLogHandler -from summarizer import NFVBenchSummarizer +from .summarizer import NFVBenchSummarizer -import json -from log import LOG -import Queue -import traceback -from utils import byteify -from utils import RunLock -import uuid +from .log import LOG +from .utils import byteify +from .utils import RunLock -# this global cannot reside in Ctx because of the @app and @socketio decorators -app = None -socketio = None +from .__init__ import __version__ STATUS_OK = 'OK' STATUS_ERROR = 'ERROR' STATUS_PENDING = 'PENDING' STATUS_NOT_FOUND = 'NOT_FOUND' - def result_json(status, message, request_id=None): body = { 'status': status, 'error_message': message } - if request_id is not None: body['request_id'] = request_id @@ -64,18 +57,16 @@ def get_uuid(): class Ctx(object): MAXLEN = 5 - run_queue = Queue.Queue() + run_queue = queue.Queue() busy = False result = None - request_from_socketio = False results = {} ids = [] current_id = None @staticmethod - def enqueue(config, request_id, from_socketio=False): + def enqueue(config, request_id): Ctx.busy = True - Ctx.request_from_socketio = from_socketio config['request_id'] = request_id Ctx.run_queue.put(config) @@ -110,16 +101,15 @@ class Ctx(object): res = Ctx.results[request_id] except KeyError: return None - + # pylint: disable=unsubscriptable-object if Ctx.result and request_id == Ctx.result['request_id']: Ctx.result = None - - return res - else: - res = Ctx.result - if res: - Ctx.result = None return res + # pylint: enable=unsubscriptable-object + res = Ctx.result + if res: + Ctx.result = None + return res @staticmethod def is_busy(): @@ -130,43 +120,21 @@ class Ctx(object): return Ctx.current_id -def setup_flask(root_path): - global socketio - global app +def setup_flask(): app = Flask(__name__) - app.root_path = root_path - socketio = SocketIO(app, async_mode='threading') busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running') not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run') not_found_msg = 'results not found' pending_msg = 'NFVbench run still pending' - # --------- socketio requests ------------ - - @socketio.on('start_run') - def socketio_start_run(config): - if not Ctx.is_busy(): - Ctx.enqueue(config, get_uuid(), from_socketio=True) - else: - emit('error', {'reason': 'there is already an NFVbench request running'}) - - @socketio.on('echo') - def socketio_echo(config): - emit('echo', config) - # --------- HTTP requests ------------ - @app.route('/') - def index(): - return render_template('index.html') - - @app.route('/echo', methods=['GET']) - def echo(): - config = request.json - return jsonify(config) + @app.route('/version', methods=['GET']) + def _version(): + return __version__ @app.route('/start_run', methods=['POST']) - def start_run(): + def _start_run(): config = load_json(request.json) if not config: config = {} @@ -178,7 +146,7 @@ def setup_flask(root_path): @app.route('/status', defaults={'request_id': None}, methods=['GET']) @app.route('/status/', methods=['GET']) - def get_status(request_id): + def _get_status(request_id): if request_id: if Ctx.is_busy() and request_id == Ctx.get_current_request_id(): # task with request_id still pending @@ -188,46 +156,36 @@ def setup_flask(root_path): if res: # found result for given request_id return jsonify(res) - else: - # result for given request_id not found - return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) - else: - if Ctx.is_busy(): - # task still pending, return with request_id - return jsonify(result_json(STATUS_PENDING, - pending_msg, - Ctx.get_current_request_id())) - - res = Ctx.get_result() - if res: - return jsonify(res) - else: - return jsonify(not_busy_json) + # result for given request_id not found + return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) + if Ctx.is_busy(): + # task still pending, return with request_id + return jsonify(result_json(STATUS_PENDING, + pending_msg, + Ctx.get_current_request_id())) + + res = Ctx.get_result() + if res: + return jsonify(res) + return jsonify(not_busy_json) + return app -class WebSocketIoServer(object): - """This class takes care of the web socketio server, accepts websocket events, and sends back - notifications using websocket events (send_ methods). Caller should simply create an instance +class WebServer(object): + """This class takes care of the web server. Caller should simply create an instance of this class and pass a runner object then invoke the run method """ - def __init__(self, http_root, runner, logger): + def __init__(self, runner, fluent_logger): self.nfvbench_runner = runner - setup_flask(http_root) - self.fluent_logger = logger - self.result_fluent_logger = None - if self.fluent_logger: - self.result_fluent_logger = \ - FluentLogHandler("resultnfvbench", - fluentd_ip=self.fluent_logger.sender.host, - fluentd_port=self.fluent_logger.sender.port) - self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate - - def run(self, host='127.0.0.1', port=7556): - - # socketio.run will not return so we need to run it in a background thread so that + self.app = setup_flask() + self.fluent_logger = fluent_logger + + def run(self, host, port): + + # app.run will not return so we need to run it in a background thread so that # the calling thread (main thread) can keep doing work - socketio.start_background_task(target=socketio.run, app=app, host=host, port=port) + Thread(target=self.app.run, args=(host, port)).start() # wait for run requests # the runner must be executed from the main thread (Trex client library requirement) @@ -236,37 +194,29 @@ class WebSocketIoServer(object): # print 'main thread waiting for requests...' config = Ctx.dequeue() # print 'main thread processing request...' - print config + # print config try: # remove unfilled values as we do not want them to override default values with None - config = {k: v for k, v in config.items() if v is not None} + config = {k: v for k, v in list(config.items()) if v is not None} with RunLock(): + if self.fluent_logger: + self.fluent_logger.start_new_run() results = self.nfvbench_runner.run(config, config) except Exception as exc: - print 'NFVbench runner exception:' - traceback.print_exc() results = result_json(STATUS_ERROR, str(exc)) - LOG.exception() + LOG.exception('NFVbench runner exception:') - if Ctx.request_from_socketio: - socketio.emit('run_end', results) - else: - # this might overwrite a previously unfetched result - Ctx.set_result(results) - if self.fluent_logger: - self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate - summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger) - LOG.info(str(summary)) + # this might overwrite a previously unfetched result + Ctx.set_result(results) + try: + summary = NFVBenchSummarizer(results['result'], self.fluent_logger) + LOG.info(str(summary)) + except KeyError: + # in case of error, 'result' might be missing + if 'error_message' in results: + LOG.error(results['error_message']) + else: + LOG.error('REST request completed without results or error message') Ctx.release() if self.fluent_logger: self.fluent_logger.send_run_summary(True) - - def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): - stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} - socketio.emit('run_interval_stats', stats) - - def send_ndr_found(self, ndr_pps): - socketio.emit('ndr_found', {'rate_pps': ndr_pps}) - - def send_pdr_found(self, pdr_pps): - socketio.emit('pdr_found', {'rate_pps': pdr_pps})