NFVBENCH-12 Add run summary to fluentd stream
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24
25 import json
26 from log import LOG
27 import Queue
28 import traceback
29 from utils import byteify
30 from utils import RunLock
31 import uuid
32
33 # this global cannot reside in Ctx because of the @app and @socketio decorators
34 app = None
35 socketio = None
36
37 STATUS_OK = 'OK'
38 STATUS_ERROR = 'ERROR'
39 STATUS_PENDING = 'PENDING'
40 STATUS_NOT_FOUND = 'NOT_FOUND'
41
42
43 def result_json(status, message, request_id=None):
44     body = {
45         'status': status,
46         'error_message': message
47     }
48
49     if request_id is not None:
50         body['request_id'] = request_id
51
52     return body
53
54
55 def load_json(data):
56     return json.loads(json.dumps(data), object_hook=byteify)
57
58
59 def get_uuid():
60     return uuid.uuid4().hex
61
62
63 class Ctx(object):
64     MAXLEN = 5
65     run_queue = Queue.Queue()
66     busy = False
67     result = None
68     request_from_socketio = False
69     results = {}
70     ids = []
71     current_id = None
72
73     @staticmethod
74     def enqueue(config, request_id, from_socketio=False):
75         Ctx.busy = True
76         Ctx.request_from_socketio = from_socketio
77         config['request_id'] = request_id
78         Ctx.run_queue.put(config)
79
80         if len(Ctx.ids) >= Ctx.MAXLEN:
81             try:
82                 del Ctx.results[Ctx.ids.pop(0)]
83             except KeyError:
84                 pass
85         Ctx.ids.append(request_id)
86
87     @staticmethod
88     def dequeue():
89         config = Ctx.run_queue.get()
90         Ctx.current_id = config['request_id']
91         return config
92
93     @staticmethod
94     def release():
95         Ctx.current_id = None
96         Ctx.busy = False
97
98     @staticmethod
99     def set_result(res):
100         res['request_id'] = Ctx.current_id
101         Ctx.results[Ctx.current_id] = res
102         Ctx.result = res
103
104     @staticmethod
105     def get_result(request_id=None):
106         if request_id:
107             try:
108                 res = Ctx.results[request_id]
109             except KeyError:
110                 return None
111
112             if Ctx.result and request_id == Ctx.result['request_id']:
113                 Ctx.result = None
114
115             return res
116         else:
117             res = Ctx.result
118             if res:
119                 Ctx.result = None
120             return res
121
122     @staticmethod
123     def is_busy():
124         return Ctx.busy
125
126     @staticmethod
127     def get_current_request_id():
128         return Ctx.current_id
129
130
131 def setup_flask(root_path):
132     global socketio
133     global app
134     app = Flask(__name__)
135     app.root_path = root_path
136     socketio = SocketIO(app, async_mode='threading')
137     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
138     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
139     not_found_msg = 'results not found'
140     pending_msg = 'NFVbench run still pending'
141
142     # --------- socketio requests ------------
143
144     @socketio.on('start_run')
145     def socketio_start_run(config):
146         if not Ctx.is_busy():
147             Ctx.enqueue(config, get_uuid(), from_socketio=True)
148         else:
149             emit('error', {'reason': 'there is already an NFVbench request running'})
150
151     @socketio.on('echo')
152     def socketio_echo(config):
153         emit('echo', config)
154
155     # --------- HTTP requests ------------
156
157     @app.route('/')
158     def index():
159         return render_template('index.html')
160
161     @app.route('/echo', methods=['GET'])
162     def echo():
163         config = request.json
164         return jsonify(config)
165
166     @app.route('/start_run', methods=['POST'])
167     def start_run():
168         config = load_json(request.json)
169         if Ctx.is_busy():
170             return jsonify(busy_json)
171         else:
172             request_id = get_uuid()
173             Ctx.enqueue(config, request_id)
174             return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
175
176     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
177     @app.route('/status/<request_id>', methods=['GET'])
178     def get_status(request_id):
179         if request_id:
180             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
181                 # task with request_id still pending
182                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
183
184             res = Ctx.get_result(request_id)
185             if res:
186                 # found result for given request_id
187                 return jsonify(res)
188             else:
189                 # result for given request_id not found
190                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
191         else:
192             if Ctx.is_busy():
193                 # task still pending, return with request_id
194                 return jsonify(result_json(STATUS_PENDING,
195                                            pending_msg,
196                                            Ctx.get_current_request_id()))
197
198             res = Ctx.get_result()
199             if res:
200                 return jsonify(res)
201             else:
202                 return jsonify(not_busy_json)
203
204
205 class WebSocketIoServer(object):
206     """This class takes care of the web socketio server, accepts websocket events, and sends back
207     notifications using websocket events (send_ methods). Caller should simply create an instance
208     of this class and pass a runner object then invoke the run method
209     """
210     def __init__(self, http_root, runner, logger):
211         self.nfvbench_runner = runner
212         setup_flask(http_root)
213         self.fluent_logger = logger
214
215     def run(self, host='127.0.0.1', port=7556):
216
217         # socketio.run will not return so we need to run it in a background thread so that
218         # the calling thread (main thread) can keep doing work
219         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
220
221         # wait for run requests
222         # the runner must be executed from the main thread (Trex client library requirement)
223         while True:
224
225             # print 'main thread waiting for requests...'
226             config = Ctx.dequeue()
227             # print 'main thread processing request...'
228             print config
229             try:
230                 # remove unfilled values as we do not want them to override default values with None
231                 config = {k: v for k, v in config.items() if v is not None}
232                 with RunLock():
233                     results = self.nfvbench_runner.run(config, config)
234             except Exception as exc:
235                 print 'NFVbench runner exception:'
236                 traceback.print_exc()
237                 results = result_json(STATUS_ERROR, str(exc))
238                 LOG.exception()
239
240             if Ctx.request_from_socketio:
241                 socketio.emit('run_end', results)
242             else:
243                 # this might overwrite a previously unfetched result
244                 Ctx.set_result(results)
245             Ctx.release()
246             self.fluent_logger.send_run_summary(True)
247
248     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
249         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
250         socketio.emit('run_interval_stats', stats)
251
252     def send_ndr_found(self, ndr_pps):
253         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
254
255     def send_pdr_found(self, pdr_pps):
256         socketio.emit('pdr_found', {'rate_pps': pdr_pps})