NFVBENCH-126 Remove socketio support (debt reduction)
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import Queue
19 from threading import Thread
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import request
25
26 from summarizer import NFVBenchSummarizer
27
28 from log import LOG
29 from utils import byteify
30 from utils import RunLock
31
32 from __init__ import __version__
33
34 STATUS_OK = 'OK'
35 STATUS_ERROR = 'ERROR'
36 STATUS_PENDING = 'PENDING'
37 STATUS_NOT_FOUND = 'NOT_FOUND'
38
39 def result_json(status, message, request_id=None):
40     body = {
41         'status': status,
42         'error_message': message
43     }
44     if request_id is not None:
45         body['request_id'] = request_id
46
47     return body
48
49
50 def load_json(data):
51     return json.loads(json.dumps(data), object_hook=byteify)
52
53
54 def get_uuid():
55     return uuid.uuid4().hex
56
57
58 class Ctx(object):
59     MAXLEN = 5
60     run_queue = Queue.Queue()
61     busy = False
62     result = None
63     results = {}
64     ids = []
65     current_id = None
66
67     @staticmethod
68     def enqueue(config, request_id):
69         Ctx.busy = True
70         config['request_id'] = request_id
71         Ctx.run_queue.put(config)
72
73         if len(Ctx.ids) >= Ctx.MAXLEN:
74             try:
75                 del Ctx.results[Ctx.ids.pop(0)]
76             except KeyError:
77                 pass
78         Ctx.ids.append(request_id)
79
80     @staticmethod
81     def dequeue():
82         config = Ctx.run_queue.get()
83         Ctx.current_id = config['request_id']
84         return config
85
86     @staticmethod
87     def release():
88         Ctx.current_id = None
89         Ctx.busy = False
90
91     @staticmethod
92     def set_result(res):
93         res['request_id'] = Ctx.current_id
94         Ctx.results[Ctx.current_id] = res
95         Ctx.result = res
96
97     @staticmethod
98     def get_result(request_id=None):
99         if request_id:
100             try:
101                 res = Ctx.results[request_id]
102             except KeyError:
103                 return None
104
105             if Ctx.result and request_id == Ctx.result['request_id']:
106                 Ctx.result = None
107
108             return res
109         else:
110             res = Ctx.result
111             if res:
112                 Ctx.result = None
113             return res
114
115     @staticmethod
116     def is_busy():
117         return Ctx.busy
118
119     @staticmethod
120     def get_current_request_id():
121         return Ctx.current_id
122
123
124 def setup_flask():
125     app = Flask(__name__)
126     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
127     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
128     not_found_msg = 'results not found'
129     pending_msg = 'NFVbench run still pending'
130
131     # --------- HTTP requests ------------
132
133     @app.route('/version', methods=['GET'])
134     def _version():
135         return __version__
136
137     @app.route('/start_run', methods=['POST'])
138     def _start_run():
139         config = load_json(request.json)
140         if not config:
141             config = {}
142         if Ctx.is_busy():
143             return jsonify(busy_json)
144         request_id = get_uuid()
145         Ctx.enqueue(config, request_id)
146         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
147
148     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
149     @app.route('/status/<request_id>', methods=['GET'])
150     def _get_status(request_id):
151         if request_id:
152             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
153                 # task with request_id still pending
154                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
155
156             res = Ctx.get_result(request_id)
157             if res:
158                 # found result for given request_id
159                 return jsonify(res)
160             # result for given request_id not found
161             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
162         else:
163             if Ctx.is_busy():
164                 # task still pending, return with request_id
165                 return jsonify(result_json(STATUS_PENDING,
166                                            pending_msg,
167                                            Ctx.get_current_request_id()))
168
169             res = Ctx.get_result()
170             if res:
171                 return jsonify(res)
172             return jsonify(not_busy_json)
173
174     return app
175
176
177 class WebServer(object):
178     """This class takes care of the web server. Caller should simply create an instance
179     of this class and pass a runner object then invoke the run method
180     """
181
182     def __init__(self, runner, fluent_logger):
183         self.nfvbench_runner = runner
184         self.app = setup_flask()
185         self.fluent_logger = fluent_logger
186
187     def run(self, host, port):
188
189         # app.run will not return so we need to run it in a background thread so that
190         # the calling thread (main thread) can keep doing work
191         Thread(target=self.app.run, args=(host, port)).start()
192
193         # wait for run requests
194         # the runner must be executed from the main thread (Trex client library requirement)
195         while True:
196
197             # print 'main thread waiting for requests...'
198             config = Ctx.dequeue()
199             # print 'main thread processing request...'
200             # print config
201             try:
202                 # remove unfilled values as we do not want them to override default values with None
203                 config = {k: v for k, v in config.items() if v is not None}
204                 with RunLock():
205                     if self.fluent_logger:
206                         self.fluent_logger.start_new_run()
207                     results = self.nfvbench_runner.run(config, config)
208             except Exception as exc:
209                 results = result_json(STATUS_ERROR, str(exc))
210                 LOG.exception('NFVbench runner exception:')
211
212             # this might overwrite a previously unfetched result
213             Ctx.set_result(results)
214             try:
215                 summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
216                 LOG.info(str(summary))
217             except KeyError:
218                 # in case of error, 'result' might be missing
219                 if 'error_message' in results:
220                     LOG.error(results['error_message'])
221                 else:
222                     LOG.error('REST request completed without results or error message')
223             Ctx.release()
224             if self.fluent_logger:
225                 self.fluent_logger.send_run_summary(True)