NFVBENCH-35 Runlogdate is 0 in resultnfvbench
[nfvbench.git] / nfvbench / nfvbenchd.py
1 #!/usr/bin/env python
2 # Copyright 2017 Cisco Systems, Inc.  All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from flask import Flask
18 from flask import jsonify
19 from flask import render_template
20 from flask import request
21
22 from flask_socketio import emit
23 from flask_socketio import SocketIO
24 from fluentd import FluentLogHandler
25 from summarizer import NFVBenchSummarizer
26
27 import json
28 from log import LOG
29 import Queue
30 import traceback
31 from utils import byteify
32 from utils import RunLock
33 import uuid
34
35 # this global cannot reside in Ctx because of the @app and @socketio decorators
36 app = None
37 socketio = None
38
39 STATUS_OK = 'OK'
40 STATUS_ERROR = 'ERROR'
41 STATUS_PENDING = 'PENDING'
42 STATUS_NOT_FOUND = 'NOT_FOUND'
43
44
45 def result_json(status, message, request_id=None):
46     body = {
47         'status': status,
48         'error_message': message
49     }
50
51     if request_id is not None:
52         body['request_id'] = request_id
53
54     return body
55
56
57 def load_json(data):
58     return json.loads(json.dumps(data), object_hook=byteify)
59
60
61 def get_uuid():
62     return uuid.uuid4().hex
63
64
65 class Ctx(object):
66     MAXLEN = 5
67     run_queue = Queue.Queue()
68     busy = False
69     result = None
70     request_from_socketio = False
71     results = {}
72     ids = []
73     current_id = None
74
75     @staticmethod
76     def enqueue(config, request_id, from_socketio=False):
77         Ctx.busy = True
78         Ctx.request_from_socketio = from_socketio
79         config['request_id'] = request_id
80         Ctx.run_queue.put(config)
81
82         if len(Ctx.ids) >= Ctx.MAXLEN:
83             try:
84                 del Ctx.results[Ctx.ids.pop(0)]
85             except KeyError:
86                 pass
87         Ctx.ids.append(request_id)
88
89     @staticmethod
90     def dequeue():
91         config = Ctx.run_queue.get()
92         Ctx.current_id = config['request_id']
93         return config
94
95     @staticmethod
96     def release():
97         Ctx.current_id = None
98         Ctx.busy = False
99
100     @staticmethod
101     def set_result(res):
102         res['request_id'] = Ctx.current_id
103         Ctx.results[Ctx.current_id] = res
104         Ctx.result = res
105
106     @staticmethod
107     def get_result(request_id=None):
108         if request_id:
109             try:
110                 res = Ctx.results[request_id]
111             except KeyError:
112                 return None
113
114             if Ctx.result and request_id == Ctx.result['request_id']:
115                 Ctx.result = None
116
117             return res
118         else:
119             res = Ctx.result
120             if res:
121                 Ctx.result = None
122             return res
123
124     @staticmethod
125     def is_busy():
126         return Ctx.busy
127
128     @staticmethod
129     def get_current_request_id():
130         return Ctx.current_id
131
132
133 def setup_flask(root_path):
134     global socketio
135     global app
136     app = Flask(__name__)
137     app.root_path = root_path
138     socketio = SocketIO(app, async_mode='threading')
139     busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
140     not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
141     not_found_msg = 'results not found'
142     pending_msg = 'NFVbench run still pending'
143
144     # --------- socketio requests ------------
145
146     @socketio.on('start_run')
147     def socketio_start_run(config):
148         if not Ctx.is_busy():
149             Ctx.enqueue(config, get_uuid(), from_socketio=True)
150         else:
151             emit('error', {'reason': 'there is already an NFVbench request running'})
152
153     @socketio.on('echo')
154     def socketio_echo(config):
155         emit('echo', config)
156
157     # --------- HTTP requests ------------
158
159     @app.route('/')
160     def index():
161         return render_template('index.html')
162
163     @app.route('/echo', methods=['GET'])
164     def echo():
165         config = request.json
166         return jsonify(config)
167
168     @app.route('/start_run', methods=['POST'])
169     def start_run():
170         config = load_json(request.json)
171         if not config:
172             config = {}
173         if Ctx.is_busy():
174             return jsonify(busy_json)
175         request_id = get_uuid()
176         Ctx.enqueue(config, request_id)
177         return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
178
179     @app.route('/status', defaults={'request_id': None}, methods=['GET'])
180     @app.route('/status/<request_id>', methods=['GET'])
181     def get_status(request_id):
182         if request_id:
183             if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
184                 # task with request_id still pending
185                 return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
186
187             res = Ctx.get_result(request_id)
188             if res:
189                 # found result for given request_id
190                 return jsonify(res)
191             else:
192                 # result for given request_id not found
193                 return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
194         else:
195             if Ctx.is_busy():
196                 # task still pending, return with request_id
197                 return jsonify(result_json(STATUS_PENDING,
198                                            pending_msg,
199                                            Ctx.get_current_request_id()))
200
201             res = Ctx.get_result()
202             if res:
203                 return jsonify(res)
204             else:
205                 return jsonify(not_busy_json)
206
207
208 class WebSocketIoServer(object):
209     """This class takes care of the web socketio server, accepts websocket events, and sends back
210     notifications using websocket events (send_ methods). Caller should simply create an instance
211     of this class and pass a runner object then invoke the run method
212     """
213
214     def __init__(self, http_root, runner, logger):
215         self.nfvbench_runner = runner
216         setup_flask(http_root)
217         self.fluent_logger = logger
218         self.result_fluent_logger = None
219         if self.fluent_logger:
220             self.result_fluent_logger = \
221                 FluentLogHandler("resultnfvbench",
222                                  fluentd_ip=self.fluent_logger.sender.host,
223                                  fluentd_port=self.fluent_logger.sender.port)
224             self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
225
226     def run(self, host='127.0.0.1', port=7556):
227
228         # socketio.run will not return so we need to run it in a background thread so that
229         # the calling thread (main thread) can keep doing work
230         socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
231
232         # wait for run requests
233         # the runner must be executed from the main thread (Trex client library requirement)
234         while True:
235
236             # print 'main thread waiting for requests...'
237             config = Ctx.dequeue()
238             # print 'main thread processing request...'
239             print config
240             try:
241                 # remove unfilled values as we do not want them to override default values with None
242                 config = {k: v for k, v in config.items() if v is not None}
243                 with RunLock():
244                     results = self.nfvbench_runner.run(config, config)
245             except Exception as exc:
246                 print 'NFVbench runner exception:'
247                 traceback.print_exc()
248                 results = result_json(STATUS_ERROR, str(exc))
249                 LOG.exception()
250
251             if Ctx.request_from_socketio:
252                 socketio.emit('run_end', results)
253             else:
254                 # this might overwrite a previously unfetched result
255                 Ctx.set_result(results)
256             if self.fluent_logger:
257                 self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
258             summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
259             LOG.info(str(summary))
260             Ctx.release()
261             if self.fluent_logger:
262                 self.fluent_logger.send_run_summary(True)
263
264     def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
265         stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
266         socketio.emit('run_interval_stats', stats)
267
268     def send_ndr_found(self, ndr_pps):
269         socketio.emit('ndr_found', {'rate_pps': ndr_pps})
270
271     def send_pdr_found(self, pdr_pps):
272         socketio.emit('pdr_found', {'rate_pps': pdr_pps})