Initial code drop from Cisco
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24
25 import json
26 import Queue
27 import traceback
28 from utils import byteify
29 from utils import RunLock
30 import uuid
31
32 # this global cannot reside in Ctx because of the @app and @socketio decorators
33 app = None
34 socketio = None
35
36 STATUS_OK = 'OK'
37 STATUS_ERROR = 'ERROR'
38 STATUS_PENDING = 'PENDING'
39 STATUS_NOT_FOUND = 'NOT_FOUND'
40
41
42 def result_json(status, message, request_id=None):
43     body = {
44         'status': status,
45         'error_message': message
46     }
47
48     if request_id is not None:
49         body['request_id'] = request_id
50
51     return body
52
53
54 def load_json(data):
55     return json.loads(json.dumps(data), object_hook=byteify)
56
57
58 def get_uuid():
59     return uuid.uuid4().hex
60
61
62 class Ctx(object):
63     MAXLEN = 5
64     run_queue = Queue.Queue()
65     busy = False
66     result = None
67     request_from_socketio = False
68     results = {}
69     ids = []
70     current_id = None
71
72     @staticmethod
73     def enqueue(config, request_id, from_socketio=False):
74         Ctx.busy = True
75         Ctx.request_from_socketio = from_socketio
76         config['request_id'] = request_id
77         Ctx.run_queue.put(config)
78
79         if len(Ctx.ids) >= Ctx.MAXLEN:
80             try:
81                 del Ctx.results[Ctx.ids.pop(0)]
82             except KeyError:
83                 pass
84         Ctx.ids.append(request_id)
85
86     @staticmethod
87     def dequeue():
88         config = Ctx.run_queue.get()
89         Ctx.current_id = config['request_id']
90         return config
91
92     @staticmethod
93     def release():
94         Ctx.current_id = None
95         Ctx.busy = False
96
97     @staticmethod
98     def set_result(res):
99         res['request_id'] = Ctx.current_id
100         Ctx.results[Ctx.current_id] = res
101         Ctx.result = res
102
103     @staticmethod
104     def get_result(request_id=None):
105         if request_id:
106             try:
107                 res = Ctx.results[request_id]
108             except KeyError:
109                 return None
110
111             if Ctx.result and request_id == Ctx.result['request_id']:
112                 Ctx.result = None
113
114             return res
115         else:
116             res = Ctx.result
117             if res:
118                 Ctx.result = None
119             return res
120
121     @staticmethod
122     def is_busy():
123         return Ctx.busy
124
125     @staticmethod
126     def get_current_request_id():
127         return Ctx.current_id
128
129
130 def setup_flask(root_path):
131     global socketio
132     global app
133     app = Flask(__name__)
134     app.root_path = root_path
135     socketio = SocketIO(app, async_mode='threading')
136     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
137     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
138     not_found_msg = 'results not found'
139     pending_msg = 'NFVbench run still pending'
140
141     # --------- socketio requests ------------
142
143     @socketio.on('start_run')
144     def socketio_start_run(config):
145         if not Ctx.is_busy():
146             Ctx.enqueue(config, get_uuid(), from_socketio=True)
147         else:
148             emit('error', {'reason': 'there is already an NFVbench request running'})
149
150     @socketio.on('echo')
151     def socketio_echo(config):
152         emit('echo', config)
153
154     # --------- HTTP requests ------------
155
156     @app.route('/')
157     def index():
158         return render_template('index.html')
159
160     @app.route('/echo', methods=['GET'])
161     def echo():
162         config = request.json
163         return jsonify(config)
164
165     @app.route('/start_run', methods=['POST'])
166     def start_run():
167         config = load_json(request.json)
168         if Ctx.is_busy():
169             return jsonify(busy_json)
170         else:
171             request_id = get_uuid()
172             Ctx.enqueue(config, request_id)
173             return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
174
175     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
176     @app.route('/status/<request_id>', methods=['GET'])
177     def get_status(request_id):
178         if request_id:
179             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
180                 # task with request_id still pending
181                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
182
183             res = Ctx.get_result(request_id)
184             if res:
185                 # found result for given request_id
186                 return jsonify(res)
187             else:
188                 # result for given request_id not found
189                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
190         else:
191             if Ctx.is_busy():
192                 # task still pending, return with request_id
193                 return jsonify(result_json(STATUS_PENDING,
194                                            pending_msg,
195                                            Ctx.get_current_request_id()))
196
197             res = Ctx.get_result()
198             if res:
199                 return jsonify(res)
200             else:
201                 return jsonify(not_busy_json)
202
203
204 class WebSocketIoServer(object):
205     """This class takes care of the web socketio server, accepts websocket events, and sends back
206     notifications using websocket events (send_ methods). Caller should simply create an instance
207     of this class and pass a runner object then invoke the run method
208     """
209     def __init__(self, http_root, runner):
210         self.nfvbench_runner = runner
211         setup_flask(http_root)
212
213     def run(self, host='127.0.0.1', port=7556):
214
215         # socketio.run will not return so we need to run it in a background thread so that
216         # the calling thread (main thread) can keep doing work
217         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
218
219         # wait for run requests
220         # the runner must be executed from the main thread (Trex client library requirement)
221         while True:
222             # print 'main thread waiting for requests...'
223             config = Ctx.dequeue()
224             # print 'main thread processing request...'
225             print config
226             try:
227                 # remove unfilled values as we do not want them to override default values with None
228                 config = {k: v for k, v in config.items() if v is not None}
229                 with RunLock():
230                     results = self.nfvbench_runner.run(config)
231             except Exception as exc:
232                 print 'NFVbench runner exception:'
233                 traceback.print_exc()
234                 results = result_json(STATUS_ERROR, str(exc))
235
236             if Ctx.request_from_socketio:
237                 socketio.emit('run_end', results)
238             else:
239                 # this might overwrite a previously unfetched result
240                 Ctx.set_result(results)
241             Ctx.release()
242
243     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
244         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
245         socketio.emit('run_interval_stats', stats)
246
247     def send_ndr_found(self, ndr_pps):
248         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
249
250     def send_pdr_found(self, pdr_pps):
251         socketio.emit('pdr_found', {'rate_pps': pdr_pps})