NFVBENCH-25 Send run results to fluentd
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24 from fluentd import FluentLogHandler
25 from summarizer import NFVBenchSummarizer
26
27 import json
28 from log import LOG
29 import Queue
30 import traceback
31 from utils import byteify
32 from utils import RunLock
33 import uuid
34
35 # this global cannot reside in Ctx because of the @app and @socketio decorators
36 app = None
37 socketio = None
38
39 STATUS_OK = 'OK'
40 STATUS_ERROR = 'ERROR'
41 STATUS_PENDING = 'PENDING'
42 STATUS_NOT_FOUND = 'NOT_FOUND'
43
44
45 def result_json(status, message, request_id=None):
46     body = {
47         'status': status,
48         'error_message': message
49     }
50
51     if request_id is not None:
52         body['request_id'] = request_id
53
54     return body
55
56
57 def load_json(data):
58     return json.loads(json.dumps(data), object_hook=byteify)
59
60
61 def get_uuid():
62     return uuid.uuid4().hex
63
64
65 class Ctx(object):
66     MAXLEN = 5
67     run_queue = Queue.Queue()
68     busy = False
69     result = None
70     request_from_socketio = False
71     results = {}
72     ids = []
73     current_id = None
74
75     @staticmethod
76     def enqueue(config, request_id, from_socketio=False):
77         Ctx.busy = True
78         Ctx.request_from_socketio = from_socketio
79         config['request_id'] = request_id
80         Ctx.run_queue.put(config)
81
82         if len(Ctx.ids) >= Ctx.MAXLEN:
83             try:
84                 del Ctx.results[Ctx.ids.pop(0)]
85             except KeyError:
86                 pass
87         Ctx.ids.append(request_id)
88
89     @staticmethod
90     def dequeue():
91         config = Ctx.run_queue.get()
92         Ctx.current_id = config['request_id']
93         return config
94
95     @staticmethod
96     def release():
97         Ctx.current_id = None
98         Ctx.busy = False
99
100     @staticmethod
101     def set_result(res):
102         res['request_id'] = Ctx.current_id
103         Ctx.results[Ctx.current_id] = res
104         Ctx.result = res
105
106     @staticmethod
107     def get_result(request_id=None):
108         if request_id:
109             try:
110                 res = Ctx.results[request_id]
111             except KeyError:
112                 return None
113
114             if Ctx.result and request_id == Ctx.result['request_id']:
115                 Ctx.result = None
116
117             return res
118         else:
119             res = Ctx.result
120             if res:
121                 Ctx.result = None
122             return res
123
124     @staticmethod
125     def is_busy():
126         return Ctx.busy
127
128     @staticmethod
129     def get_current_request_id():
130         return Ctx.current_id
131
132
133 def setup_flask(root_path):
134     global socketio
135     global app
136     app = Flask(__name__)
137     app.root_path = root_path
138     socketio = SocketIO(app, async_mode='threading')
139     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
140     config_is_null_json = result_json(STATUS_ERROR, 'configuration is missing')
141     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
142     not_found_msg = 'results not found'
143     pending_msg = 'NFVbench run still pending'
144
145     # --------- socketio requests ------------
146
147     @socketio.on('start_run')
148     def socketio_start_run(config):
149         if not Ctx.is_busy():
150             Ctx.enqueue(config, get_uuid(), from_socketio=True)
151         else:
152             emit('error', {'reason': 'there is already an NFVbench request running'})
153
154     @socketio.on('echo')
155     def socketio_echo(config):
156         emit('echo', config)
157
158     # --------- HTTP requests ------------
159
160     @app.route('/')
161     def index():
162         return render_template('index.html')
163
164     @app.route('/echo', methods=['GET'])
165     def echo():
166         config = request.json
167         return jsonify(config)
168
169     @app.route('/start_run', methods=['POST'])
170     def start_run():
171         config = load_json(request.json)
172         if config:
173             if Ctx.is_busy():
174                 return jsonify(busy_json)
175             else:
176                 request_id = get_uuid()
177                 Ctx.enqueue(config, request_id)
178                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
179         else:
180             return jsonify(config_is_null_json)
181
182     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
183     @app.route('/status/<request_id>', methods=['GET'])
184     def get_status(request_id):
185         if request_id:
186             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
187                 # task with request_id still pending
188                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
189
190             res = Ctx.get_result(request_id)
191             if res:
192                 # found result for given request_id
193                 return jsonify(res)
194             else:
195                 # result for given request_id not found
196                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
197         else:
198             if Ctx.is_busy():
199                 # task still pending, return with request_id
200                 return jsonify(result_json(STATUS_PENDING,
201                                            pending_msg,
202                                            Ctx.get_current_request_id()))
203
204             res = Ctx.get_result()
205             if res:
206                 return jsonify(res)
207             else:
208                 return jsonify(not_busy_json)
209
210
211 class WebSocketIoServer(object):
212     """This class takes care of the web socketio server, accepts websocket events, and sends back
213     notifications using websocket events (send_ methods). Caller should simply create an instance
214     of this class and pass a runner object then invoke the run method
215     """
216
217     def __init__(self, http_root, runner, logger):
218         self.nfvbench_runner = runner
219         setup_flask(http_root)
220         self.fluent_logger = logger
221         self.result_fluent_logger = FluentLogHandler("resultnfvbench",
222                                                      fluentd_ip=self.fluent_logger.sender.host,
223                                                      fluentd_port=self.fluent_logger.sender.port) \
224             if self.fluent_logger else None
225
226     def run(self, host='127.0.0.1', port=7556):
227
228         # socketio.run will not return so we need to run it in a background thread so that
229         # the calling thread (main thread) can keep doing work
230         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
231
232         # wait for run requests
233         # the runner must be executed from the main thread (Trex client library requirement)
234         while True:
235
236             # print 'main thread waiting for requests...'
237             config = Ctx.dequeue()
238             # print 'main thread processing request...'
239             print config
240             try:
241                 # remove unfilled values as we do not want them to override default values with None
242                 config = {k: v for k, v in config.items() if v is not None}
243                 with RunLock():
244                     results = self.nfvbench_runner.run(config, config)
245             except Exception as exc:
246                 print 'NFVbench runner exception:'
247                 traceback.print_exc()
248                 results = result_json(STATUS_ERROR, str(exc))
249                 LOG.exception()
250
251             if Ctx.request_from_socketio:
252                 socketio.emit('run_end', results)
253             else:
254                 # this might overwrite a previously unfetched result
255                 Ctx.set_result(results)
256             summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
257             LOG.info(str(summary))
258             Ctx.release()
259             if self.fluent_logger:
260                 self.fluent_logger.send_run_summary(True)
261
262     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
263         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
264         socketio.emit('run_interval_stats', stats)
265
266     def send_ndr_found(self, ndr_pps):
267         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
268
269     def send_pdr_found(self, pdr_pps):
270         socketio.emit('pdr_found', {'rate_pps': pdr_pps})