# under the License.
#
+import json
+import queue
+from threading import Thread
+import uuid
+
from flask import Flask
from flask import jsonify
-from flask import render_template
from flask import request
-from flask_socketio import emit
-from flask_socketio import SocketIO
-from fluentd import FluentLogHandler
-from summarizer import NFVBenchSummarizer
+from .summarizer import NFVBenchSummarizer
-import json
-from log import LOG
-import Queue
-import traceback
-from utils import byteify
-from utils import RunLock
-import uuid
+from .log import LOG
+from .utils import RunLock
-# this global cannot reside in Ctx because of the @app and @socketio decorators
-app = None
-socketio = None
+from .__init__ import __version__
STATUS_OK = 'OK'
STATUS_ERROR = 'ERROR'
STATUS_PENDING = 'PENDING'
STATUS_NOT_FOUND = 'NOT_FOUND'
-
def result_json(status, message, request_id=None):
body = {
'status': status,
'error_message': message
}
-
if request_id is not None:
body['request_id'] = request_id
def load_json(data):
- return json.loads(json.dumps(data), object_hook=byteify)
+ return json.loads(json.dumps(data))
def get_uuid():
class Ctx(object):
MAXLEN = 5
- run_queue = Queue.Queue()
+ run_queue = queue.Queue()
busy = False
result = None
- request_from_socketio = False
results = {}
ids = []
current_id = None
@staticmethod
- def enqueue(config, request_id, from_socketio=False):
+ def enqueue(config, request_id):
Ctx.busy = True
- Ctx.request_from_socketio = from_socketio
config['request_id'] = request_id
Ctx.run_queue.put(config)
res = Ctx.results[request_id]
except KeyError:
return None
-
+ # pylint: disable=unsubscriptable-object
if Ctx.result and request_id == Ctx.result['request_id']:
Ctx.result = None
-
- return res
- else:
- res = Ctx.result
- if res:
- Ctx.result = None
return res
+ # pylint: enable=unsubscriptable-object
+ res = Ctx.result
+ if res:
+ Ctx.result = None
+ return res
@staticmethod
def is_busy():
return Ctx.current_id
-def setup_flask(root_path):
- global socketio
- global app
+def setup_flask():
app = Flask(__name__)
- app.root_path = root_path
- socketio = SocketIO(app, async_mode='threading')
busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
- config_is_null_json = result_json(STATUS_ERROR, 'configuration is missing')
not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
not_found_msg = 'results not found'
pending_msg = 'NFVbench run still pending'
- # --------- socketio requests ------------
-
- @socketio.on('start_run')
- def socketio_start_run(config):
- if not Ctx.is_busy():
- Ctx.enqueue(config, get_uuid(), from_socketio=True)
- else:
- emit('error', {'reason': 'there is already an NFVbench request running'})
-
- @socketio.on('echo')
- def socketio_echo(config):
- emit('echo', config)
-
# --------- HTTP requests ------------
- @app.route('/')
- def index():
- return render_template('index.html')
-
- @app.route('/echo', methods=['GET'])
- def echo():
- config = request.json
- return jsonify(config)
+ @app.route('/version', methods=['GET'])
+ def _version():
+ return __version__
@app.route('/start_run', methods=['POST'])
- def start_run():
+ def _start_run():
config = load_json(request.json)
- if config:
- if Ctx.is_busy():
- return jsonify(busy_json)
- else:
- request_id = get_uuid()
- Ctx.enqueue(config, request_id)
- return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
- else:
- return jsonify(config_is_null_json)
+ if not config:
+ config = {}
+ if Ctx.is_busy():
+ return jsonify(busy_json)
+ request_id = get_uuid()
+ Ctx.enqueue(config, request_id)
+ return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
@app.route('/status', defaults={'request_id': None}, methods=['GET'])
@app.route('/status/<request_id>', methods=['GET'])
- def get_status(request_id):
+ def _get_status(request_id):
if request_id:
if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
# task with request_id still pending
if res:
# found result for given request_id
return jsonify(res)
- else:
- # result for given request_id not found
- return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
- else:
- if Ctx.is_busy():
- # task still pending, return with request_id
- return jsonify(result_json(STATUS_PENDING,
- pending_msg,
- Ctx.get_current_request_id()))
-
- res = Ctx.get_result()
- if res:
- return jsonify(res)
- else:
- return jsonify(not_busy_json)
-
-
-class WebSocketIoServer(object):
- """This class takes care of the web socketio server, accepts websocket events, and sends back
- notifications using websocket events (send_ methods). Caller should simply create an instance
+ # result for given request_id not found
+ return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
+ if Ctx.is_busy():
+ # task still pending, return with request_id
+ return jsonify(result_json(STATUS_PENDING,
+ pending_msg,
+ Ctx.get_current_request_id()))
+
+ res = Ctx.get_result()
+ if res:
+ return jsonify(res)
+ return jsonify(not_busy_json)
+
+ return app
+
+class WebServer(object):
+ """This class takes care of the web server. Caller should simply create an instance
of this class and pass a runner object then invoke the run method
"""
- def __init__(self, http_root, runner, logger):
+ def __init__(self, runner, fluent_logger):
self.nfvbench_runner = runner
- setup_flask(http_root)
- self.fluent_logger = logger
- self.result_fluent_logger = FluentLogHandler("resultnfvbench",
- fluentd_ip=self.fluent_logger.sender.host,
- fluentd_port=self.fluent_logger.sender.port) \
- if self.fluent_logger else None
+ self.app = setup_flask()
+ self.fluent_logger = fluent_logger
- def run(self, host='127.0.0.1', port=7556):
+ def run(self, host, port):
- # socketio.run will not return so we need to run it in a background thread so that
+ # app.run will not return so we need to run it in a background thread so that
# the calling thread (main thread) can keep doing work
- socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
+ Thread(target=self.app.run, args=(host, port)).start()
# wait for run requests
# the runner must be executed from the main thread (Trex client library requirement)
# print 'main thread waiting for requests...'
config = Ctx.dequeue()
# print 'main thread processing request...'
- print config
+ # print config
try:
# remove unfilled values as we do not want them to override default values with None
- config = {k: v for k, v in config.items() if v is not None}
+ config = {k: v for k, v in list(config.items()) if v is not None}
with RunLock():
+ if self.fluent_logger:
+ self.fluent_logger.start_new_run()
results = self.nfvbench_runner.run(config, config)
except Exception as exc:
- print 'NFVbench runner exception:'
- traceback.print_exc()
results = result_json(STATUS_ERROR, str(exc))
- LOG.exception()
-
- if Ctx.request_from_socketio:
- socketio.emit('run_end', results)
- else:
- # this might overwrite a previously unfetched result
- Ctx.set_result(results)
- summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
- LOG.info(str(summary))
+ LOG.exception('NFVbench runner exception:')
+
+ # this might overwrite a previously unfetched result
+ Ctx.set_result(results)
+ try:
+ summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
+ LOG.info(str(summary))
+ if 'json' in config and 'result' in results and results['status']:
+ self.nfvbench_runner.save(results['result'])
+ except KeyError:
+ # in case of error, 'result' might be missing
+ if 'error_message' in results:
+ LOG.error(results['error_message'])
+ else:
+ LOG.error('REST request completed without results or error message')
Ctx.release()
if self.fluent_logger:
self.fluent_logger.send_run_summary(True)
-
- def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
- stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
- socketio.emit('run_interval_stats', stats)
-
- def send_ndr_found(self, ndr_pps):
- socketio.emit('ndr_found', {'rate_pps': ndr_pps})
-
- def send_pdr_found(self, pdr_pps):
- socketio.emit('pdr_found', {'rate_pps': pdr_pps})