NFVBENCH-34 Pass configuration file to entrypoint script
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24 from fluentd import FluentLogHandler
25 from summarizer import NFVBenchSummarizer
26
27 import json
28 from log import LOG
29 import Queue
30 import traceback
31 from utils import byteify
32 from utils import RunLock
33 import uuid
34
35 # this global cannot reside in Ctx because of the @app and @socketio decorators
36 app = None
37 socketio = None
38
39 STATUS_OK = 'OK'
40 STATUS_ERROR = 'ERROR'
41 STATUS_PENDING = 'PENDING'
42 STATUS_NOT_FOUND = 'NOT_FOUND'
43
44
45 def result_json(status, message, request_id=None):
46     body = {
47         'status': status,
48         'error_message': message
49     }
50
51     if request_id is not None:
52         body['request_id'] = request_id
53
54     return body
55
56
57 def load_json(data):
58     return json.loads(json.dumps(data), object_hook=byteify)
59
60
61 def get_uuid():
62     return uuid.uuid4().hex
63
64
65 class Ctx(object):
66     MAXLEN = 5
67     run_queue = Queue.Queue()
68     busy = False
69     result = None
70     request_from_socketio = False
71     results = {}
72     ids = []
73     current_id = None
74
75     @staticmethod
76     def enqueue(config, request_id, from_socketio=False):
77         Ctx.busy = True
78         Ctx.request_from_socketio = from_socketio
79         config['request_id'] = request_id
80         Ctx.run_queue.put(config)
81
82         if len(Ctx.ids) >= Ctx.MAXLEN:
83             try:
84                 del Ctx.results[Ctx.ids.pop(0)]
85             except KeyError:
86                 pass
87         Ctx.ids.append(request_id)
88
89     @staticmethod
90     def dequeue():
91         config = Ctx.run_queue.get()
92         Ctx.current_id = config['request_id']
93         return config
94
95     @staticmethod
96     def release():
97         Ctx.current_id = None
98         Ctx.busy = False
99
100     @staticmethod
101     def set_result(res):
102         res['request_id'] = Ctx.current_id
103         Ctx.results[Ctx.current_id] = res
104         Ctx.result = res
105
106     @staticmethod
107     def get_result(request_id=None):
108         if request_id:
109             try:
110                 res = Ctx.results[request_id]
111             except KeyError:
112                 return None
113
114             if Ctx.result and request_id == Ctx.result['request_id']:
115                 Ctx.result = None
116
117             return res
118         else:
119             res = Ctx.result
120             if res:
121                 Ctx.result = None
122             return res
123
124     @staticmethod
125     def is_busy():
126         return Ctx.busy
127
128     @staticmethod
129     def get_current_request_id():
130         return Ctx.current_id
131
132
133 def setup_flask(root_path):
134     global socketio
135     global app
136     app = Flask(__name__)
137     app.root_path = root_path
138     socketio = SocketIO(app, async_mode='threading')
139     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
140     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
141     not_found_msg = 'results not found'
142     pending_msg = 'NFVbench run still pending'
143
144     # --------- socketio requests ------------
145
146     @socketio.on('start_run')
147     def socketio_start_run(config):
148         if not Ctx.is_busy():
149             Ctx.enqueue(config, get_uuid(), from_socketio=True)
150         else:
151             emit('error', {'reason': 'there is already an NFVbench request running'})
152
153     @socketio.on('echo')
154     def socketio_echo(config):
155         emit('echo', config)
156
157     # --------- HTTP requests ------------
158
159     @app.route('/')
160     def index():
161         return render_template('index.html')
162
163     @app.route('/echo', methods=['GET'])
164     def echo():
165         config = request.json
166         return jsonify(config)
167
168     @app.route('/start_run', methods=['POST'])
169     def start_run():
170         config = load_json(request.json)
171         if not config:
172             config = {}
173         if Ctx.is_busy():
174             return jsonify(busy_json)
175         request_id = get_uuid()
176         Ctx.enqueue(config, request_id)
177         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
178
179     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
180     @app.route('/status/<request_id>', methods=['GET'])
181     def get_status(request_id):
182         if request_id:
183             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
184                 # task with request_id still pending
185                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
186
187             res = Ctx.get_result(request_id)
188             if res:
189                 # found result for given request_id
190                 return jsonify(res)
191             else:
192                 # result for given request_id not found
193                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
194         else:
195             if Ctx.is_busy():
196                 # task still pending, return with request_id
197                 return jsonify(result_json(STATUS_PENDING,
198                                            pending_msg,
199                                            Ctx.get_current_request_id()))
200
201             res = Ctx.get_result()
202             if res:
203                 return jsonify(res)
204             else:
205                 return jsonify(not_busy_json)
206
207
208 class WebSocketIoServer(object):
209     """This class takes care of the web socketio server, accepts websocket events, and sends back
210     notifications using websocket events (send_ methods). Caller should simply create an instance
211     of this class and pass a runner object then invoke the run method
212     """
213
214     def __init__(self, http_root, runner, logger):
215         self.nfvbench_runner = runner
216         setup_flask(http_root)
217         self.fluent_logger = logger
218         self.result_fluent_logger = FluentLogHandler("resultnfvbench",
219                                                      fluentd_ip=self.fluent_logger.sender.host,
220                                                      fluentd_port=self.fluent_logger.sender.port) \
221             if self.fluent_logger else None
222
223     def run(self, host='127.0.0.1', port=7556):
224
225         # socketio.run will not return so we need to run it in a background thread so that
226         # the calling thread (main thread) can keep doing work
227         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
228
229         # wait for run requests
230         # the runner must be executed from the main thread (Trex client library requirement)
231         while True:
232
233             # print 'main thread waiting for requests...'
234             config = Ctx.dequeue()
235             # print 'main thread processing request...'
236             print config
237             try:
238                 # remove unfilled values as we do not want them to override default values with None
239                 config = {k: v for k, v in config.items() if v is not None}
240                 with RunLock():
241                     results = self.nfvbench_runner.run(config, config)
242             except Exception as exc:
243                 print 'NFVbench runner exception:'
244                 traceback.print_exc()
245                 results = result_json(STATUS_ERROR, str(exc))
246                 LOG.exception()
247
248             if Ctx.request_from_socketio:
249                 socketio.emit('run_end', results)
250             else:
251                 # this might overwrite a previously unfetched result
252                 Ctx.set_result(results)
253             summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
254             LOG.info(str(summary))
255             Ctx.release()
256             if self.fluent_logger:
257                 self.fluent_logger.send_run_summary(True)
258
259     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
260         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
261         socketio.emit('run_interval_stats', stats)
262
263     def send_ndr_found(self, ndr_pps):
264         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
265
266     def send_pdr_found(self, pdr_pps):
267         socketio.emit('pdr_found', {'rate_pps': pdr_pps})