15b71c543ad1fbdbd68759ef59b4ec7d20a6b3b9
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import Queue
19 import traceback
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import render_template
25 from flask import request
26
27 from flask_socketio import emit
28 from flask_socketio import SocketIO
29 from fluentd import FluentLogHandler
30 from summarizer import NFVBenchSummarizer
31
32 from log import LOG
33 from utils import byteify
34 from utils import RunLock
35
36
37 # this global cannot reside in Ctx because of the @app and @socketio decorators
38 app = None
39 socketio = None
40
41 STATUS_OK = 'OK'
42 STATUS_ERROR = 'ERROR'
43 STATUS_PENDING = 'PENDING'
44 STATUS_NOT_FOUND = 'NOT_FOUND'
45
46
47 def result_json(status, message, request_id=None):
48     body = {
49         'status': status,
50         'error_message': message
51     }
52
53     if request_id is not None:
54         body['request_id'] = request_id
55
56     return body
57
58
59 def load_json(data):
60     return json.loads(json.dumps(data), object_hook=byteify)
61
62
63 def get_uuid():
64     return uuid.uuid4().hex
65
66
67 class Ctx(object):
68     MAXLEN = 5
69     run_queue = Queue.Queue()
70     busy = False
71     result = None
72     request_from_socketio = False
73     results = {}
74     ids = []
75     current_id = None
76
77     @staticmethod
78     def enqueue(config, request_id, from_socketio=False):
79         Ctx.busy = True
80         Ctx.request_from_socketio = from_socketio
81         config['request_id'] = request_id
82         Ctx.run_queue.put(config)
83
84         if len(Ctx.ids) >= Ctx.MAXLEN:
85             try:
86                 del Ctx.results[Ctx.ids.pop(0)]
87             except KeyError:
88                 pass
89         Ctx.ids.append(request_id)
90
91     @staticmethod
92     def dequeue():
93         config = Ctx.run_queue.get()
94         Ctx.current_id = config['request_id']
95         return config
96
97     @staticmethod
98     def release():
99         Ctx.current_id = None
100         Ctx.busy = False
101
102     @staticmethod
103     def set_result(res):
104         res['request_id'] = Ctx.current_id
105         Ctx.results[Ctx.current_id] = res
106         Ctx.result = res
107
108     @staticmethod
109     def get_result(request_id=None):
110         if request_id:
111             try:
112                 res = Ctx.results[request_id]
113             except KeyError:
114                 return None
115
116             if Ctx.result and request_id == Ctx.result['request_id']:
117                 Ctx.result = None
118
119             return res
120         else:
121             res = Ctx.result
122             if res:
123                 Ctx.result = None
124             return res
125
126     @staticmethod
127     def is_busy():
128         return Ctx.busy
129
130     @staticmethod
131     def get_current_request_id():
132         return Ctx.current_id
133
134
135 def setup_flask(root_path):
136     global socketio
137     global app
138     app = Flask(__name__)
139     app.root_path = root_path
140     socketio = SocketIO(app, async_mode='threading')
141     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
142     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
143     not_found_msg = 'results not found'
144     pending_msg = 'NFVbench run still pending'
145
146     # --------- socketio requests ------------
147
148     @socketio.on('start_run')
149     def _socketio_start_run(config):
150         if not Ctx.is_busy():
151             Ctx.enqueue(config, get_uuid(), from_socketio=True)
152         else:
153             emit('error', {'reason': 'there is already an NFVbench request running'})
154
155     @socketio.on('echo')
156     def _socketio_echo(config):
157         emit('echo', config)
158
159     # --------- HTTP requests ------------
160
161     @app.route('/')
162     def _index():
163         return render_template('index.html')
164
165     @app.route('/echo', methods=['GET'])
166     def _echo():
167         config = request.json
168         return jsonify(config)
169
170     @app.route('/start_run', methods=['POST'])
171     def _start_run():
172         config = load_json(request.json)
173         if not config:
174             config = {}
175         if Ctx.is_busy():
176             return jsonify(busy_json)
177         request_id = get_uuid()
178         Ctx.enqueue(config, request_id)
179         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
180
181     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
182     @app.route('/status/<request_id>', methods=['GET'])
183     def _get_status(request_id):
184         if request_id:
185             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
186                 # task with request_id still pending
187                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
188
189             res = Ctx.get_result(request_id)
190             if res:
191                 # found result for given request_id
192                 return jsonify(res)
193             # result for given request_id not found
194             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
195         else:
196             if Ctx.is_busy():
197                 # task still pending, return with request_id
198                 return jsonify(result_json(STATUS_PENDING,
199                                            pending_msg,
200                                            Ctx.get_current_request_id()))
201
202             res = Ctx.get_result()
203             if res:
204                 return jsonify(res)
205             return jsonify(not_busy_json)
206
207
208 class WebSocketIoServer(object):
209     """This class takes care of the web socketio server, accepts websocket events, and sends back
210     notifications using websocket events (send_ methods). Caller should simply create an instance
211     of this class and pass a runner object then invoke the run method
212     """
213
214     def __init__(self, http_root, runner, logger):
215         self.nfvbench_runner = runner
216         setup_flask(http_root)
217         self.fluent_logger = logger
218         self.result_fluent_logger = None
219         if self.fluent_logger:
220             self.result_fluent_logger = \
221                 FluentLogHandler("resultnfvbench",
222                                  fluentd_ip=self.fluent_logger.sender.host,
223                                  fluentd_port=self.fluent_logger.sender.port)
224             self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
225
226     def run(self, host='127.0.0.1', port=7556):
227
228         # socketio.run will not return so we need to run it in a background thread so that
229         # the calling thread (main thread) can keep doing work
230         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
231
232         # wait for run requests
233         # the runner must be executed from the main thread (Trex client library requirement)
234         while True:
235
236             # print 'main thread waiting for requests...'
237             config = Ctx.dequeue()
238             # print 'main thread processing request...'
239             print config
240             try:
241                 # remove unfilled values as we do not want them to override default values with None
242                 config = {k: v for k, v in config.items() if v is not None}
243                 with RunLock():
244                     results = self.nfvbench_runner.run(config, config)
245             except Exception as exc:
246                 print 'NFVbench runner exception:'
247                 traceback.print_exc()
248                 results = result_json(STATUS_ERROR, str(exc))
249                 LOG.exception()
250
251             if Ctx.request_from_socketio:
252                 socketio.emit('run_end', results)
253             else:
254                 # this might overwrite a previously unfetched result
255                 Ctx.set_result(results)
256             if self.fluent_logger:
257                 self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
258             summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
259             LOG.info(str(summary))
260             Ctx.release()
261             if self.fluent_logger:
262                 self.fluent_logger.send_run_summary(True)
263
264     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
265         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
266         socketio.emit('run_interval_stats', stats)
267
268     def send_ndr_found(self, ndr_pps):
269         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
270
271     def send_pdr_found(self, pdr_pps):
272         socketio.emit('pdr_found', {'rate_pps': pdr_pps})