NFVBENCH-41 Add fluentd result tag to nfvbench server
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import Queue
19 import traceback
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import render_template
25 from flask import request
26
27 from flask_socketio import emit
28 from flask_socketio import SocketIO
29 from fluentd import FluentLogHandler
30 from summarizer import NFVBenchSummarizer
31
32 from log import LOG
33 from utils import byteify
34 from utils import RunLock
35
36 # this global cannot reside in Ctx because of the @app and @socketio decorators
37 app = None
38 socketio = None
39
40 STATUS_OK = 'OK'
41 STATUS_ERROR = 'ERROR'
42 STATUS_PENDING = 'PENDING'
43 STATUS_NOT_FOUND = 'NOT_FOUND'
44
45
46 def result_json(status, message, request_id=None):
47     body = {
48         'status': status,
49         'error_message': message
50     }
51
52     if request_id is not None:
53         body['request_id'] = request_id
54
55     return body
56
57
58 def load_json(data):
59     return json.loads(json.dumps(data), object_hook=byteify)
60
61
62 def get_uuid():
63     return uuid.uuid4().hex
64
65
66 class Ctx(object):
67     MAXLEN = 5
68     run_queue = Queue.Queue()
69     busy = False
70     result = None
71     request_from_socketio = False
72     results = {}
73     ids = []
74     current_id = None
75
76     @staticmethod
77     def enqueue(config, request_id, from_socketio=False):
78         Ctx.busy = True
79         Ctx.request_from_socketio = from_socketio
80         config['request_id'] = request_id
81         Ctx.run_queue.put(config)
82
83         if len(Ctx.ids) >= Ctx.MAXLEN:
84             try:
85                 del Ctx.results[Ctx.ids.pop(0)]
86             except KeyError:
87                 pass
88         Ctx.ids.append(request_id)
89
90     @staticmethod
91     def dequeue():
92         config = Ctx.run_queue.get()
93         Ctx.current_id = config['request_id']
94         return config
95
96     @staticmethod
97     def release():
98         Ctx.current_id = None
99         Ctx.busy = False
100
101     @staticmethod
102     def set_result(res):
103         res['request_id'] = Ctx.current_id
104         Ctx.results[Ctx.current_id] = res
105         Ctx.result = res
106
107     @staticmethod
108     def get_result(request_id=None):
109         if request_id:
110             try:
111                 res = Ctx.results[request_id]
112             except KeyError:
113                 return None
114
115             if Ctx.result and request_id == Ctx.result['request_id']:
116                 Ctx.result = None
117
118             return res
119         else:
120             res = Ctx.result
121             if res:
122                 Ctx.result = None
123             return res
124
125     @staticmethod
126     def is_busy():
127         return Ctx.busy
128
129     @staticmethod
130     def get_current_request_id():
131         return Ctx.current_id
132
133
134 def setup_flask(root_path):
135     global socketio
136     global app
137     app = Flask(__name__)
138     app.root_path = root_path
139     socketio = SocketIO(app, async_mode='threading')
140     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
141     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
142     not_found_msg = 'results not found'
143     pending_msg = 'NFVbench run still pending'
144
145     # --------- socketio requests ------------
146
147     @socketio.on('start_run')
148     def _socketio_start_run(config):
149         if not Ctx.is_busy():
150             Ctx.enqueue(config, get_uuid(), from_socketio=True)
151         else:
152             emit('error', {'reason': 'there is already an NFVbench request running'})
153
154     @socketio.on('echo')
155     def _socketio_echo(config):
156         emit('echo', config)
157
158     # --------- HTTP requests ------------
159
160     @app.route('/')
161     def _index():
162         return render_template('index.html')
163
164     @app.route('/echo', methods=['GET'])
165     def _echo():
166         config = request.json
167         return jsonify(config)
168
169     @app.route('/start_run', methods=['POST'])
170     def _start_run():
171         config = load_json(request.json)
172         if not config:
173             config = {}
174         if Ctx.is_busy():
175             return jsonify(busy_json)
176         request_id = get_uuid()
177         Ctx.enqueue(config, request_id)
178         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
179
180     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
181     @app.route('/status/<request_id>', methods=['GET'])
182     def _get_status(request_id):
183         if request_id:
184             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
185                 # task with request_id still pending
186                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
187
188             res = Ctx.get_result(request_id)
189             if res:
190                 # found result for given request_id
191                 return jsonify(res)
192             # result for given request_id not found
193             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
194         else:
195             if Ctx.is_busy():
196                 # task still pending, return with request_id
197                 return jsonify(result_json(STATUS_PENDING,
198                                            pending_msg,
199                                            Ctx.get_current_request_id()))
200
201             res = Ctx.get_result()
202             if res:
203                 return jsonify(res)
204             return jsonify(not_busy_json)
205
206
207 class WebSocketIoServer(object):
208     """This class takes care of the web socketio server, accepts websocket events, and sends back
209     notifications using websocket events (send_ methods). Caller should simply create an instance
210     of this class and pass a runner object then invoke the run method
211     """
212
213     def __init__(self, http_root, runner, logger, result_tag):
214         self.nfvbench_runner = runner
215         setup_flask(http_root)
216         self.fluent_logger = logger
217         self.result_fluent_logger = None
218         if result_tag:
219             self.result_fluent_logger = \
220                 FluentLogHandler(result_tag,
221                                  fluentd_ip=self.fluent_logger.sender.host,
222                                  fluentd_port=self.fluent_logger.sender.port)
223             self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
224
225     def run(self, host='127.0.0.1', port=7556):
226
227         # socketio.run will not return so we need to run it in a background thread so that
228         # the calling thread (main thread) can keep doing work
229         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
230
231         # wait for run requests
232         # the runner must be executed from the main thread (Trex client library requirement)
233         while True:
234
235             # print 'main thread waiting for requests...'
236             config = Ctx.dequeue()
237             # print 'main thread processing request...'
238             print config
239             try:
240                 # remove unfilled values as we do not want them to override default values with None
241                 config = {k: v for k, v in config.items() if v is not None}
242                 with RunLock():
243                     results = self.nfvbench_runner.run(config, config)
244             except Exception as exc:
245                 print 'NFVbench runner exception:'
246                 traceback.print_exc()
247                 results = result_json(STATUS_ERROR, str(exc))
248                 LOG.exception()
249
250             if Ctx.request_from_socketio:
251                 socketio.emit('run_end', results)
252             else:
253                 # this might overwrite a previously unfetched result
254                 Ctx.set_result(results)
255             if self.fluent_logger:
256                 self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
257             summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
258             LOG.info(str(summary))
259             Ctx.release()
260             if self.fluent_logger:
261                 self.fluent_logger.send_run_summary(True)
262
263     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
264         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
265         socketio.emit('run_interval_stats', stats)
266
267     def send_ndr_found(self, ndr_pps):
268         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
269
270     def send_pdr_found(self, pdr_pps):
271         socketio.emit('pdr_found', {'rate_pps': pdr_pps})