[NFVBENCH-168] Improve config properties managed after a REST call
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import queue
19 from threading import Thread
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import request
25
26 from .summarizer import NFVBenchSummarizer
27
28 from .log import LOG
29 from .utils import RunLock
30
31 from .__init__ import __version__
32
33 STATUS_OK = 'OK'
34 STATUS_ERROR = 'ERROR'
35 STATUS_PENDING = 'PENDING'
36 STATUS_NOT_FOUND = 'NOT_FOUND'
37
38 def result_json(status, message, request_id=None):
39     body = {
40         'status': status,
41         'error_message': message
42     }
43     if request_id is not None:
44         body['request_id'] = request_id
45
46     return body
47
48
49 def load_json(data):
50     return json.loads(json.dumps(data))
51
52
53 def get_uuid():
54     return uuid.uuid4().hex
55
56
57 class Ctx(object):
58     MAXLEN = 5
59     run_queue = queue.Queue()
60     busy = False
61     result = None
62     results = {}
63     ids = []
64     current_id = None
65
66     @staticmethod
67     def enqueue(config, request_id):
68         Ctx.busy = True
69         config['request_id'] = request_id
70         Ctx.run_queue.put(config)
71
72         if len(Ctx.ids) >= Ctx.MAXLEN:
73             try:
74                 del Ctx.results[Ctx.ids.pop(0)]
75             except KeyError:
76                 pass
77         Ctx.ids.append(request_id)
78
79     @staticmethod
80     def dequeue():
81         config = Ctx.run_queue.get()
82         Ctx.current_id = config['request_id']
83         return config
84
85     @staticmethod
86     def release():
87         Ctx.current_id = None
88         Ctx.busy = False
89
90     @staticmethod
91     def set_result(res):
92         res['request_id'] = Ctx.current_id
93         Ctx.results[Ctx.current_id] = res
94         Ctx.result = res
95
96     @staticmethod
97     def get_result(request_id=None):
98         if request_id:
99             try:
100                 res = Ctx.results[request_id]
101             except KeyError:
102                 return None
103             # pylint: disable=unsubscriptable-object
104             if Ctx.result and request_id == Ctx.result['request_id']:
105                 Ctx.result = None
106             return res
107             # pylint: enable=unsubscriptable-object
108         res = Ctx.result
109         if res:
110             Ctx.result = None
111         return res
112
113     @staticmethod
114     def is_busy():
115         return Ctx.busy
116
117     @staticmethod
118     def get_current_request_id():
119         return Ctx.current_id
120
121
122 def setup_flask():
123     app = Flask(__name__)
124     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
125     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
126     not_found_msg = 'results not found'
127     pending_msg = 'NFVbench run still pending'
128
129     # --------- HTTP requests ------------
130
131     @app.route('/version', methods=['GET'])
132     def _version():
133         return __version__
134
135     @app.route('/start_run', methods=['POST'])
136     def _start_run():
137         config = load_json(request.json)
138         if not config:
139             config = {}
140         if Ctx.is_busy():
141             return jsonify(busy_json)
142         request_id = get_uuid()
143         Ctx.enqueue(config, request_id)
144         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
145
146     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
147     @app.route('/status/<request_id>', methods=['GET'])
148     def _get_status(request_id):
149         if request_id:
150             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
151                 # task with request_id still pending
152                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
153
154             res = Ctx.get_result(request_id)
155             if res:
156                 # found result for given request_id
157                 return jsonify(res)
158             # result for given request_id not found
159             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
160         if Ctx.is_busy():
161             # task still pending, return with request_id
162             return jsonify(result_json(STATUS_PENDING,
163                                        pending_msg,
164                                        Ctx.get_current_request_id()))
165
166         res = Ctx.get_result()
167         if res:
168             return jsonify(res)
169         return jsonify(not_busy_json)
170
171     return app
172
173 class WebServer(object):
174     """This class takes care of the web server. Caller should simply create an instance
175     of this class and pass a runner object then invoke the run method
176     """
177
178     def __init__(self, runner, fluent_logger):
179         self.nfvbench_runner = runner
180         self.app = setup_flask()
181         self.fluent_logger = fluent_logger
182
183     def run(self, host, port):
184
185         # app.run will not return so we need to run it in a background thread so that
186         # the calling thread (main thread) can keep doing work
187         Thread(target=self.app.run, args=(host, port)).start()
188
189         # wait for run requests
190         # the runner must be executed from the main thread (Trex client library requirement)
191         while True:
192
193             # print 'main thread waiting for requests...'
194             config = Ctx.dequeue()
195             # print 'main thread processing request...'
196             # print config
197             try:
198                 # remove unfilled values as we do not want them to override default values with None
199                 config = {k: v for k, v in list(config.items()) if v is not None}
200                 with RunLock():
201                     if self.fluent_logger:
202                         self.fluent_logger.start_new_run()
203                     results = self.nfvbench_runner.run(config, config)
204             except Exception as exc:
205                 results = result_json(STATUS_ERROR, str(exc))
206                 LOG.exception('NFVbench runner exception:')
207
208             # this might overwrite a previously unfetched result
209             Ctx.set_result(results)
210             try:
211                 summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
212                 LOG.info(str(summary))
213                 if 'json' in config and 'result' in results and results['status']:
214                     self.nfvbench_runner.save(results['result'])
215             except KeyError:
216                 # in case of error, 'result' might be missing
217                 if 'error_message' in results:
218                     LOG.error(results['error_message'])
219                 else:
220                     LOG.error('REST request completed without results or error message')
221             Ctx.release()
222             if self.fluent_logger:
223                 self.fluent_logger.send_run_summary(True)