3534950b06ed888d78eba4a04d164385fffa8095
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24
25 import json
26 from log import LOG
27 import Queue
28 import traceback
29 from utils import byteify
30 from utils import RunLock
31 import uuid
32
33 # this global cannot reside in Ctx because of the @app and @socketio decorators
34 app = None
35 socketio = None
36
37 STATUS_OK = 'OK'
38 STATUS_ERROR = 'ERROR'
39 STATUS_PENDING = 'PENDING'
40 STATUS_NOT_FOUND = 'NOT_FOUND'
41
42
43 def result_json(status, message, request_id=None):
44     body = {
45         'status': status,
46         'error_message': message
47     }
48
49     if request_id is not None:
50         body['request_id'] = request_id
51
52     return body
53
54
55 def load_json(data):
56     return json.loads(json.dumps(data), object_hook=byteify)
57
58
59 def get_uuid():
60     return uuid.uuid4().hex
61
62
63 class Ctx(object):
64     MAXLEN = 5
65     run_queue = Queue.Queue()
66     busy = False
67     result = None
68     request_from_socketio = False
69     results = {}
70     ids = []
71     current_id = None
72
73     @staticmethod
74     def enqueue(config, request_id, from_socketio=False):
75         Ctx.busy = True
76         Ctx.request_from_socketio = from_socketio
77         config['request_id'] = request_id
78         Ctx.run_queue.put(config)
79
80         if len(Ctx.ids) >= Ctx.MAXLEN:
81             try:
82                 del Ctx.results[Ctx.ids.pop(0)]
83             except KeyError:
84                 pass
85         Ctx.ids.append(request_id)
86
87     @staticmethod
88     def dequeue():
89         config = Ctx.run_queue.get()
90         Ctx.current_id = config['request_id']
91         return config
92
93     @staticmethod
94     def release():
95         Ctx.current_id = None
96         Ctx.busy = False
97
98     @staticmethod
99     def set_result(res):
100         res['request_id'] = Ctx.current_id
101         Ctx.results[Ctx.current_id] = res
102         Ctx.result = res
103
104     @staticmethod
105     def get_result(request_id=None):
106         if request_id:
107             try:
108                 res = Ctx.results[request_id]
109             except KeyError:
110                 return None
111
112             if Ctx.result and request_id == Ctx.result['request_id']:
113                 Ctx.result = None
114
115             return res
116         else:
117             res = Ctx.result
118             if res:
119                 Ctx.result = None
120             return res
121
122     @staticmethod
123     def is_busy():
124         return Ctx.busy
125
126     @staticmethod
127     def get_current_request_id():
128         return Ctx.current_id
129
130
131 def setup_flask(root_path):
132     global socketio
133     global app
134     app = Flask(__name__)
135     app.root_path = root_path
136     socketio = SocketIO(app, async_mode='threading')
137     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
138     config_is_null_json = result_json(STATUS_ERROR, 'configuration is missing')
139     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
140     not_found_msg = 'results not found'
141     pending_msg = 'NFVbench run still pending'
142
143     # --------- socketio requests ------------
144
145     @socketio.on('start_run')
146     def socketio_start_run(config):
147         if not Ctx.is_busy():
148             Ctx.enqueue(config, get_uuid(), from_socketio=True)
149         else:
150             emit('error', {'reason': 'there is already an NFVbench request running'})
151
152     @socketio.on('echo')
153     def socketio_echo(config):
154         emit('echo', config)
155
156     # --------- HTTP requests ------------
157
158     @app.route('/')
159     def index():
160         return render_template('index.html')
161
162     @app.route('/echo', methods=['GET'])
163     def echo():
164         config = request.json
165         return jsonify(config)
166
167     @app.route('/start_run', methods=['POST'])
168     def start_run():
169         config = load_json(request.json)
170         if config:
171             if Ctx.is_busy():
172                 return jsonify(busy_json)
173             else:
174                 request_id = get_uuid()
175                 Ctx.enqueue(config, request_id)
176                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
177         else:
178             return jsonify(config_is_null_json)
179
180     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
181     @app.route('/status/<request_id>', methods=['GET'])
182     def get_status(request_id):
183         if request_id:
184             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
185                 # task with request_id still pending
186                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
187
188             res = Ctx.get_result(request_id)
189             if res:
190                 # found result for given request_id
191                 return jsonify(res)
192             else:
193                 # result for given request_id not found
194                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
195         else:
196             if Ctx.is_busy():
197                 # task still pending, return with request_id
198                 return jsonify(result_json(STATUS_PENDING,
199                                            pending_msg,
200                                            Ctx.get_current_request_id()))
201
202             res = Ctx.get_result()
203             if res:
204                 return jsonify(res)
205             else:
206                 return jsonify(not_busy_json)
207
208
209 class WebSocketIoServer(object):
210     """This class takes care of the web socketio server, accepts websocket events, and sends back
211     notifications using websocket events (send_ methods). Caller should simply create an instance
212     of this class and pass a runner object then invoke the run method
213     """
214     def __init__(self, http_root, runner, logger):
215         self.nfvbench_runner = runner
216         setup_flask(http_root)
217         self.fluent_logger = logger
218
219     def run(self, host='127.0.0.1', port=7556):
220
221         # socketio.run will not return so we need to run it in a background thread so that
222         # the calling thread (main thread) can keep doing work
223         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
224
225         # wait for run requests
226         # the runner must be executed from the main thread (Trex client library requirement)
227         while True:
228
229             # print 'main thread waiting for requests...'
230             config = Ctx.dequeue()
231             # print 'main thread processing request...'
232             print config
233             try:
234                 # remove unfilled values as we do not want them to override default values with None
235                 config = {k: v for k, v in config.items() if v is not None}
236                 with RunLock():
237                     results = self.nfvbench_runner.run(config, config)
238             except Exception as exc:
239                 print 'NFVbench runner exception:'
240                 traceback.print_exc()
241                 results = result_json(STATUS_ERROR, str(exc))
242                 LOG.exception()
243
244             if Ctx.request_from_socketio:
245                 socketio.emit('run_end', results)
246             else:
247                 # this might overwrite a previously unfetched result
248                 Ctx.set_result(results)
249             Ctx.release()
250             self.fluent_logger.send_run_summary(True)
251
252     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
253         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
254         socketio.emit('run_interval_stats', stats)
255
256     def send_ndr_found(self, ndr_pps):
257         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
258
259     def send_pdr_found(self, pdr_pps):
260         socketio.emit('pdr_found', {'rate_pps': pdr_pps})