2 # Copyright 2017 Cisco Systems, Inc. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
22 from flask import Flask
23 from flask import jsonify
24 from flask import render_template
25 from flask import request
27 from flask_socketio import emit
28 from flask_socketio import SocketIO
29 from fluentd import FluentLogHandler
30 from summarizer import NFVBenchSummarizer
33 from utils import byteify
34 from utils import RunLock
37 # this global cannot reside in Ctx because of the @app and @socketio decorators
42 STATUS_ERROR = 'ERROR'
43 STATUS_PENDING = 'PENDING'
44 STATUS_NOT_FOUND = 'NOT_FOUND'
47 def result_json(status, message, request_id=None):
50 'error_message': message
53 if request_id is not None:
54 body['request_id'] = request_id
60 return json.loads(json.dumps(data), object_hook=byteify)
64 return uuid.uuid4().hex
69 run_queue = Queue.Queue()
72 request_from_socketio = False
78 def enqueue(config, request_id, from_socketio=False):
80 Ctx.request_from_socketio = from_socketio
81 config['request_id'] = request_id
82 Ctx.run_queue.put(config)
84 if len(Ctx.ids) >= Ctx.MAXLEN:
86 del Ctx.results[Ctx.ids.pop(0)]
89 Ctx.ids.append(request_id)
93 config = Ctx.run_queue.get()
94 Ctx.current_id = config['request_id']
104 res['request_id'] = Ctx.current_id
105 Ctx.results[Ctx.current_id] = res
109 def get_result(request_id=None):
112 res = Ctx.results[request_id]
116 if Ctx.result and request_id == Ctx.result['request_id']:
131 def get_current_request_id():
132 return Ctx.current_id
135 def setup_flask(root_path):
138 app = Flask(__name__)
139 app.root_path = root_path
140 socketio = SocketIO(app, async_mode='threading')
141 busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
142 not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
143 not_found_msg = 'results not found'
144 pending_msg = 'NFVbench run still pending'
146 # --------- socketio requests ------------
148 @socketio.on('start_run')
149 def _socketio_start_run(config):
150 if not Ctx.is_busy():
151 Ctx.enqueue(config, get_uuid(), from_socketio=True)
153 emit('error', {'reason': 'there is already an NFVbench request running'})
156 def _socketio_echo(config):
159 # --------- HTTP requests ------------
163 return render_template('index.html')
165 @app.route('/echo', methods=['GET'])
167 config = request.json
168 return jsonify(config)
170 @app.route('/start_run', methods=['POST'])
172 config = load_json(request.json)
176 return jsonify(busy_json)
177 request_id = get_uuid()
178 Ctx.enqueue(config, request_id)
179 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
181 @app.route('/status', defaults={'request_id': None}, methods=['GET'])
182 @app.route('/status/<request_id>', methods=['GET'])
183 def _get_status(request_id):
185 if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
186 # task with request_id still pending
187 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
189 res = Ctx.get_result(request_id)
191 # found result for given request_id
193 # result for given request_id not found
194 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
197 # task still pending, return with request_id
198 return jsonify(result_json(STATUS_PENDING,
200 Ctx.get_current_request_id()))
202 res = Ctx.get_result()
205 return jsonify(not_busy_json)
208 class WebSocketIoServer(object):
209 """This class takes care of the web socketio server, accepts websocket events, and sends back
210 notifications using websocket events (send_ methods). Caller should simply create an instance
211 of this class and pass a runner object then invoke the run method
214 def __init__(self, http_root, runner, logger):
215 self.nfvbench_runner = runner
216 setup_flask(http_root)
217 self.fluent_logger = logger
218 self.result_fluent_logger = None
219 if self.fluent_logger:
220 self.result_fluent_logger = \
221 FluentLogHandler("resultnfvbench",
222 fluentd_ip=self.fluent_logger.sender.host,
223 fluentd_port=self.fluent_logger.sender.port)
224 self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
226 def run(self, host='127.0.0.1', port=7556):
228 # socketio.run will not return so we need to run it in a background thread so that
229 # the calling thread (main thread) can keep doing work
230 socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
232 # wait for run requests
233 # the runner must be executed from the main thread (Trex client library requirement)
236 # print 'main thread waiting for requests...'
237 config = Ctx.dequeue()
238 # print 'main thread processing request...'
241 # remove unfilled values as we do not want them to override default values with None
242 config = {k: v for k, v in config.items() if v is not None}
244 results = self.nfvbench_runner.run(config, config)
245 except Exception as exc:
246 print 'NFVbench runner exception:'
247 traceback.print_exc()
248 results = result_json(STATUS_ERROR, str(exc))
251 if Ctx.request_from_socketio:
252 socketio.emit('run_end', results)
254 # this might overwrite a previously unfetched result
255 Ctx.set_result(results)
256 if self.fluent_logger:
257 self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
258 summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
259 LOG.info(str(summary))
261 if self.fluent_logger:
262 self.fluent_logger.send_run_summary(True)
264 def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
265 stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
266 socketio.emit('run_interval_stats', stats)
268 def send_ndr_found(self, ndr_pps):
269 socketio.emit('ndr_found', {'rate_pps': ndr_pps})
271 def send_pdr_found(self, pdr_pps):
272 socketio.emit('pdr_found', {'rate_pps': pdr_pps})