NFVBENCH-153 Add support for python3
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import json
18 import queue
19 from threading import Thread
20 import uuid
21
22 from flask import Flask
23 from flask import jsonify
24 from flask import request
25
26 from .summarizer import NFVBenchSummarizer
27
28 from .log import LOG
29 from .utils import byteify
30 from .utils import RunLock
31
32 from .__init__ import __version__
33
34 STATUS_OK = 'OK'
35 STATUS_ERROR = 'ERROR'
36 STATUS_PENDING = 'PENDING'
37 STATUS_NOT_FOUND = 'NOT_FOUND'
38
39 def result_json(status, message, request_id=None):
40     body = {
41         'status': status,
42         'error_message': message
43     }
44     if request_id is not None:
45         body['request_id'] = request_id
46
47     return body
48
49
50 def load_json(data):
51     return json.loads(json.dumps(data), object_hook=byteify)
52
53
54 def get_uuid():
55     return uuid.uuid4().hex
56
57
58 class Ctx(object):
59     MAXLEN = 5
60     run_queue = queue.Queue()
61     busy = False
62     result = None
63     results = {}
64     ids = []
65     current_id = None
66
67     @staticmethod
68     def enqueue(config, request_id):
69         Ctx.busy = True
70         config['request_id'] = request_id
71         Ctx.run_queue.put(config)
72
73         if len(Ctx.ids) >= Ctx.MAXLEN:
74             try:
75                 del Ctx.results[Ctx.ids.pop(0)]
76             except KeyError:
77                 pass
78         Ctx.ids.append(request_id)
79
80     @staticmethod
81     def dequeue():
82         config = Ctx.run_queue.get()
83         Ctx.current_id = config['request_id']
84         return config
85
86     @staticmethod
87     def release():
88         Ctx.current_id = None
89         Ctx.busy = False
90
91     @staticmethod
92     def set_result(res):
93         res['request_id'] = Ctx.current_id
94         Ctx.results[Ctx.current_id] = res
95         Ctx.result = res
96
97     @staticmethod
98     def get_result(request_id=None):
99         if request_id:
100             try:
101                 res = Ctx.results[request_id]
102             except KeyError:
103                 return None
104             # pylint: disable=unsubscriptable-object
105             if Ctx.result and request_id == Ctx.result['request_id']:
106                 Ctx.result = None
107             return res
108             # pylint: enable=unsubscriptable-object
109         res = Ctx.result
110         if res:
111             Ctx.result = None
112         return res
113
114     @staticmethod
115     def is_busy():
116         return Ctx.busy
117
118     @staticmethod
119     def get_current_request_id():
120         return Ctx.current_id
121
122
123 def setup_flask():
124     app = Flask(__name__)
125     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
126     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
127     not_found_msg = 'results not found'
128     pending_msg = 'NFVbench run still pending'
129
130     # --------- HTTP requests ------------
131
132     @app.route('/version', methods=['GET'])
133     def _version():
134         return __version__
135
136     @app.route('/start_run', methods=['POST'])
137     def _start_run():
138         config = load_json(request.json)
139         if not config:
140             config = {}
141         if Ctx.is_busy():
142             return jsonify(busy_json)
143         request_id = get_uuid()
144         Ctx.enqueue(config, request_id)
145         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
146
147     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
148     @app.route('/status/<request_id>', methods=['GET'])
149     def _get_status(request_id):
150         if request_id:
151             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
152                 # task with request_id still pending
153                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
154
155             res = Ctx.get_result(request_id)
156             if res:
157                 # found result for given request_id
158                 return jsonify(res)
159             # result for given request_id not found
160             return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
161         if Ctx.is_busy():
162             # task still pending, return with request_id
163             return jsonify(result_json(STATUS_PENDING,
164                                        pending_msg,
165                                        Ctx.get_current_request_id()))
166
167         res = Ctx.get_result()
168         if res:
169             return jsonify(res)
170         return jsonify(not_busy_json)
171
172     return app
173
174 class WebServer(object):
175     """This class takes care of the web server. Caller should simply create an instance
176     of this class and pass a runner object then invoke the run method
177     """
178
179     def __init__(self, runner, fluent_logger):
180         self.nfvbench_runner = runner
181         self.app = setup_flask()
182         self.fluent_logger = fluent_logger
183
184     def run(self, host, port):
185
186         # app.run will not return so we need to run it in a background thread so that
187         # the calling thread (main thread) can keep doing work
188         Thread(target=self.app.run, args=(host, port)).start()
189
190         # wait for run requests
191         # the runner must be executed from the main thread (Trex client library requirement)
192         while True:
193
194             # print 'main thread waiting for requests...'
195             config = Ctx.dequeue()
196             # print 'main thread processing request...'
197             # print config
198             try:
199                 # remove unfilled values as we do not want them to override default values with None
200                 config = {k: v for k, v in list(config.items()) if v is not None}
201                 with RunLock():
202                     if self.fluent_logger:
203                         self.fluent_logger.start_new_run()
204                     results = self.nfvbench_runner.run(config, config)
205             except Exception as exc:
206                 results = result_json(STATUS_ERROR, str(exc))
207                 LOG.exception('NFVbench runner exception:')
208
209             # this might overwrite a previously unfetched result
210             Ctx.set_result(results)
211             try:
212                 summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
213                 LOG.info(str(summary))
214             except KeyError:
215                 # in case of error, 'result' might be missing
216                 if 'error_message' in results:
217                     LOG.error(results['error_message'])
218                 else:
219                     LOG.error('REST request completed without results or error message')
220             Ctx.release()
221             if self.fluent_logger:
222                 self.fluent_logger.send_run_summary(True)