NFVBENCH-44 Use message when logging exception
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import Queue
19 import uuid
20
21 from flask import Flask
22 from flask import jsonify
23 from flask import render_template
24 from flask import request
25
26 from flask_socketio import emit
27 from flask_socketio import SocketIO
28 from summarizer import NFVBenchSummarizer
29
30 from log import LOG
31 from utils import byteify
32 from utils import RunLock
33
34 # this global cannot reside in Ctx because of the @app and @socketio decorators
35 app = None
36 socketio = None
37
38 STATUS_OK = 'OK'
39 STATUS_ERROR = 'ERROR'
40 STATUS_PENDING = 'PENDING'
41 STATUS_NOT_FOUND = 'NOT_FOUND'
42
43
44 def result_json(status, message, request_id=None):
45     body = {
46         'status': status,
47         'error_message': message
48     }
49
50     if request_id is not None:
51         body['request_id'] = request_id
52
53     return body
54
55
56 def load_json(data):
57     return json.loads(json.dumps(data), object_hook=byteify)
58
59
60 def get_uuid():
61     return uuid.uuid4().hex
62
63
64 class Ctx(object):
65     MAXLEN = 5
66     run_queue = Queue.Queue()
67     busy = False
68     result = None
69     request_from_socketio = False
70     results = {}
71     ids = []
72     current_id = None
73
74     @staticmethod
75     def enqueue(config, request_id, from_socketio=False):
76         Ctx.busy = True
77         Ctx.request_from_socketio = from_socketio
78         config['request_id'] = request_id
79         Ctx.run_queue.put(config)
80
81         if len(Ctx.ids) >= Ctx.MAXLEN:
82             try:
83                 del Ctx.results[Ctx.ids.pop(0)]
84             except KeyError:
85                 pass
86         Ctx.ids.append(request_id)
87
88     @staticmethod
89     def dequeue():
90         config = Ctx.run_queue.get()
91         Ctx.current_id = config['request_id']
92         return config
93
94     @staticmethod
95     def release():
96         Ctx.current_id = None
97         Ctx.busy = False
98
99     @staticmethod
100     def set_result(res):
101         res['request_id'] = Ctx.current_id
102         Ctx.results[Ctx.current_id] = res
103         Ctx.result = res
104
105     @staticmethod
106     def get_result(request_id=None):
107         if request_id:
108             try:
109                 res = Ctx.results[request_id]
110             except KeyError:
111                 return None
112
113             if Ctx.result and request_id == Ctx.result['request_id']:
114                 Ctx.result = None
115
116             return res
117         else:
118             res = Ctx.result
119             if res:
120                 Ctx.result = None
121             return res
122
123     @staticmethod
124     def is_busy():
125         return Ctx.busy
126
127     @staticmethod
128     def get_current_request_id():
129         return Ctx.current_id
130
131
132 def setup_flask(root_path):
133     global socketio
134     global app
135     app = Flask(__name__)
136     app.root_path = root_path
137     socketio = SocketIO(app, async_mode='threading')
138     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
139     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
140     not_found_msg = 'results not found'
141     pending_msg = 'NFVbench run still pending'
142
143     # --------- socketio requests ------------
144
145     @socketio.on('start_run')
146     def _socketio_start_run(config):
147         if not Ctx.is_busy():
148             Ctx.enqueue(config, get_uuid(), from_socketio=True)
149         else:
150             emit('error', {'reason': 'there is already an NFVbench request running'})
151
152     @socketio.on('echo')
153     def _socketio_echo(config):
154         emit('echo', config)
155
156     # --------- HTTP requests ------------
157
158     @app.route('/')
159     def _index():
160         return render_template('index.html')
161
162     @app.route('/echo', methods=['GET'])
163     def _echo():
164         config = request.json
165         return jsonify(config)
166
167     @app.route('/start_run', methods=['POST'])
168     def _start_run():
169         config = load_json(request.json)
170         if not config:
171             config = {}
172         if Ctx.is_busy():
173             return jsonify(busy_json)
174         request_id = get_uuid()
175         Ctx.enqueue(config, request_id)
176         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
177
178     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
179     @app.route('/status/<request_id>', methods=['GET'])
180     def _get_status(request_id):
181         if request_id:
182             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
183                 # task with request_id still pending
184                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
185
186             res = Ctx.get_result(request_id)
187             if res:
188                 # found result for given request_id
189                 return jsonify(res)
190             # result for given request_id not found
191             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
192         else:
193             if Ctx.is_busy():
194                 # task still pending, return with request_id
195                 return jsonify(result_json(STATUS_PENDING,
196                                            pending_msg,
197                                            Ctx.get_current_request_id()))
198
199             res = Ctx.get_result()
200             if res:
201                 return jsonify(res)
202             return jsonify(not_busy_json)
203
204
205 class WebSocketIoServer(object):
206     """This class takes care of the web socketio server, accepts websocket events, and sends back
207     notifications using websocket events (send_ methods). Caller should simply create an instance
208     of this class and pass a runner object then invoke the run method
209     """
210
211     def __init__(self, http_root, runner, fluent_logger):
212         self.nfvbench_runner = runner
213         setup_flask(http_root)
214         self.fluent_logger = fluent_logger
215
216     def run(self, host='127.0.0.1', port=7556):
217
218         # socketio.run will not return so we need to run it in a background thread so that
219         # the calling thread (main thread) can keep doing work
220         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
221
222         # wait for run requests
223         # the runner must be executed from the main thread (Trex client library requirement)
224         while True:
225
226             # print 'main thread waiting for requests...'
227             config = Ctx.dequeue()
228             # print 'main thread processing request...'
229             print config
230             try:
231                 # remove unfilled values as we do not want them to override default values with None
232                 config = {k: v for k, v in config.items() if v is not None}
233                 with RunLock():
234                     if self.fluent_logger:
235                         self.fluent_logger.start_new_run()
236                     results = self.nfvbench_runner.run(config, config)
237             except Exception as exc:
238                 results = result_json(STATUS_ERROR, str(exc))
239                 LOG.exception('NFVbench runner exception:')
240
241             if Ctx.request_from_socketio:
242                 socketio.emit('run_end', results)
243             else:
244                 # this might overwrite a previously unfetched result
245                 Ctx.set_result(results)
246             summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
247             LOG.info(str(summary))
248             Ctx.release()
249             if self.fluent_logger:
250                 self.fluent_logger.send_run_summary(True)
251
252     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
253         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
254         socketio.emit('run_interval_stats', stats)
255
256     def send_ndr_found(self, ndr_pps):
257         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
258
259     def send_pdr_found(self, pdr_pps):
260         socketio.emit('pdr_found', {'rate_pps': pdr_pps})