NFVBENCH-42 Add multiple fluentd aggregators support
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import Queue
19 import traceback
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import render_template
25 from flask import request
26
27 from flask_socketio import emit
28 from flask_socketio import SocketIO
29 from summarizer import NFVBenchSummarizer
30
31 from log import LOG
32 from utils import byteify
33 from utils import RunLock
34
35 # this global cannot reside in Ctx because of the @app and @socketio decorators
36 app = None
37 socketio = None
38
39 STATUS_OK = 'OK'
40 STATUS_ERROR = 'ERROR'
41 STATUS_PENDING = 'PENDING'
42 STATUS_NOT_FOUND = 'NOT_FOUND'
43
44
45 def result_json(status, message, request_id=None):
46     body = {
47         'status': status,
48         'error_message': message
49     }
50
51     if request_id is not None:
52         body['request_id'] = request_id
53
54     return body
55
56
57 def load_json(data):
58     return json.loads(json.dumps(data), object_hook=byteify)
59
60
61 def get_uuid():
62     return uuid.uuid4().hex
63
64
65 class Ctx(object):
66     MAXLEN = 5
67     run_queue = Queue.Queue()
68     busy = False
69     result = None
70     request_from_socketio = False
71     results = {}
72     ids = []
73     current_id = None
74
75     @staticmethod
76     def enqueue(config, request_id, from_socketio=False):
77         Ctx.busy = True
78         Ctx.request_from_socketio = from_socketio
79         config['request_id'] = request_id
80         Ctx.run_queue.put(config)
81
82         if len(Ctx.ids) >= Ctx.MAXLEN:
83             try:
84                 del Ctx.results[Ctx.ids.pop(0)]
85             except KeyError:
86                 pass
87         Ctx.ids.append(request_id)
88
89     @staticmethod
90     def dequeue():
91         config = Ctx.run_queue.get()
92         Ctx.current_id = config['request_id']
93         return config
94
95     @staticmethod
96     def release():
97         Ctx.current_id = None
98         Ctx.busy = False
99
100     @staticmethod
101     def set_result(res):
102         res['request_id'] = Ctx.current_id
103         Ctx.results[Ctx.current_id] = res
104         Ctx.result = res
105
106     @staticmethod
107     def get_result(request_id=None):
108         if request_id:
109             try:
110                 res = Ctx.results[request_id]
111             except KeyError:
112                 return None
113
114             if Ctx.result and request_id == Ctx.result['request_id']:
115                 Ctx.result = None
116
117             return res
118         else:
119             res = Ctx.result
120             if res:
121                 Ctx.result = None
122             return res
123
124     @staticmethod
125     def is_busy():
126         return Ctx.busy
127
128     @staticmethod
129     def get_current_request_id():
130         return Ctx.current_id
131
132
133 def setup_flask(root_path):
134     global socketio
135     global app
136     app = Flask(__name__)
137     app.root_path = root_path
138     socketio = SocketIO(app, async_mode='threading')
139     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
140     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
141     not_found_msg = 'results not found'
142     pending_msg = 'NFVbench run still pending'
143
144     # --------- socketio requests ------------
145
146     @socketio.on('start_run')
147     def _socketio_start_run(config):
148         if not Ctx.is_busy():
149             Ctx.enqueue(config, get_uuid(), from_socketio=True)
150         else:
151             emit('error', {'reason': 'there is already an NFVbench request running'})
152
153     @socketio.on('echo')
154     def _socketio_echo(config):
155         emit('echo', config)
156
157     # --------- HTTP requests ------------
158
159     @app.route('/')
160     def _index():
161         return render_template('index.html')
162
163     @app.route('/echo', methods=['GET'])
164     def _echo():
165         config = request.json
166         return jsonify(config)
167
168     @app.route('/start_run', methods=['POST'])
169     def _start_run():
170         config = load_json(request.json)
171         if not config:
172             config = {}
173         if Ctx.is_busy():
174             return jsonify(busy_json)
175         request_id = get_uuid()
176         Ctx.enqueue(config, request_id)
177         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
178
179     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
180     @app.route('/status/<request_id>', methods=['GET'])
181     def _get_status(request_id):
182         if request_id:
183             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
184                 # task with request_id still pending
185                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
186
187             res = Ctx.get_result(request_id)
188             if res:
189                 # found result for given request_id
190                 return jsonify(res)
191             # result for given request_id not found
192             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
193         else:
194             if Ctx.is_busy():
195                 # task still pending, return with request_id
196                 return jsonify(result_json(STATUS_PENDING,
197                                            pending_msg,
198                                            Ctx.get_current_request_id()))
199
200             res = Ctx.get_result()
201             if res:
202                 return jsonify(res)
203             return jsonify(not_busy_json)
204
205
206 class WebSocketIoServer(object):
207     """This class takes care of the web socketio server, accepts websocket events, and sends back
208     notifications using websocket events (send_ methods). Caller should simply create an instance
209     of this class and pass a runner object then invoke the run method
210     """
211
212     def __init__(self, http_root, runner, fluent_logger):
213         self.nfvbench_runner = runner
214         setup_flask(http_root)
215         self.fluent_logger = fluent_logger
216
217     def run(self, host='127.0.0.1', port=7556):
218
219         # socketio.run will not return so we need to run it in a background thread so that
220         # the calling thread (main thread) can keep doing work
221         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
222
223         # wait for run requests
224         # the runner must be executed from the main thread (Trex client library requirement)
225         while True:
226
227             # print 'main thread waiting for requests...'
228             config = Ctx.dequeue()
229             # print 'main thread processing request...'
230             print config
231             try:
232                 # remove unfilled values as we do not want them to override default values with None
233                 config = {k: v for k, v in config.items() if v is not None}
234                 with RunLock():
235                     if self.fluent_logger:
236                         self.fluent_logger.start_new_run()
237                     results = self.nfvbench_runner.run(config, config)
238             except Exception as exc:
239                 print 'NFVbench runner exception:'
240                 traceback.print_exc()
241                 results = result_json(STATUS_ERROR, str(exc))
242                 LOG.exception()
243
244             if Ctx.request_from_socketio:
245                 socketio.emit('run_end', results)
246             else:
247                 # this might overwrite a previously unfetched result
248                 Ctx.set_result(results)
249             summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
250             LOG.info(str(summary))
251             Ctx.release()
252             if self.fluent_logger:
253                 self.fluent_logger.send_run_summary(True)
254
255     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
256         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
257         socketio.emit('run_interval_stats', stats)
258
259     def send_ndr_found(self, ndr_pps):
260         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
261
262     def send_pdr_found(self, pdr_pps):
263         socketio.emit('pdr_found', {'rate_pps': pdr_pps})