behave_tests: log nfvbench traffic runs
[nfvbench.git] / behave_tests / features / steps / steps.py
1 #!/usr/bin/env python
2 # Copyright 2021 Orange
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from functools import reduce
18
19 from behave import given
20 from behave import when
21 from behave import then
22 from requests import RequestException
23 from retry import retry
24 import json
25 import requests
26 import subprocess
27 from subprocess import DEVNULL
28 from typing import Optional
29
30 from nfvbench.summarizer import Formatter
31 from nfvbench.traffic_gen.traffic_utils import parse_rate_str
32
33 STATUS_ERROR = "ERROR"
34
35 STATUS_OK = "OK"
36
37
38 """Given steps."""
39
40
41 @given('PROJECT_NAME: {project_name}')
42 def override_xtesting_project_name(context, project_name):
43     context.data['PROJECT_NAME'] = project_name
44
45
46 @given('TEST_DB_URL: {test_db_url}')
47 def override_xtesting_test_db_url(context, test_db_url):
48     context.data['TEST_DB_URL'] = test_db_url
49     context.data['BASE_TEST_DB_URL'] = context.data['TEST_DB_URL'].replace('results', '')
50
51
52 @given('INSTALLER_TYPE: {installer_type}')
53 def override_xtesting_installer_type(context, installer_type):
54     context.data['INSTALLER_TYPE'] = installer_type
55
56
57 @given('DEPLOY_SCENARIO: {deploy_scenario}')
58 def override_xtesting_deploy_scenario(context, deploy_scenario):
59     context.data['DEPLOY_SCENARIO'] = deploy_scenario
60
61
62 @given('NODE_NAME: {node_name}')
63 def override_xtesting_node_name(context, node_name):
64     context.data['NODE_NAME'] = node_name
65
66
67 @given('BUILD_TAG: {build_tag}')
68 def override_xtesting_build_tag(context, build_tag):
69     context.data['BUILD_TAG'] = build_tag
70
71
72 @given('NFVbench config from file: {config_path}')
73 def init_config(context, config_path):
74     context.data['config'] = config_path
75
76
77 @given('a JSON NFVbench config')
78 def init_config_from_json(context):
79     context.json.update(json.loads(context.text))
80
81
82 @given('log file: {log_file_path}')
83 def log_config(context, log_file_path):
84     context.json['log_file'] = log_file_path
85
86
87 @given('json file: {json_file_path}')
88 def json_config(context, json_file_path):
89     context.json['json'] = json_file_path
90
91
92 @given('no clean up')
93 def add_no_clean_up_flag(context):
94     context.json['no_cleanup'] = 'true'
95
96
97 @given('TRex is restarted')
98 def add_restart(context):
99     context.json['restart'] = 'true'
100
101
102 @given('{label} label')
103 def add_label(context, label):
104     context.json['label'] = label
105
106
107 @given('{frame_size} frame size')
108 def add_frame_size(context, frame_size):
109     context.json['frame_sizes'] = [frame_size]
110
111
112 @given('{flow_count} flow count')
113 def add_flow_count(context, flow_count):
114     context.json['flow_count'] = flow_count
115
116
117 @given('{rate} rate')
118 def add_rate(context, rate):
119     context.json['rate'] = rate
120
121
122 @given('{duration} sec run duration')
123 def add_duration(context, duration):
124     context.json['duration_sec'] = duration
125
126
127 @given('{percentage_rate} rate of previous scenario')
128 def add_percentage_rate(context, percentage_rate):
129     context.percentage_rate = percentage_rate
130     rate = percentage_previous_rate(context, percentage_rate)
131     context.json['rate'] = rate
132
133
134 """When steps."""
135
136
137 @when('NFVbench API is ready')
138 @when('NFVbench API is ready on host {host_ip}')
139 @when('NFVbench API is ready on host {host_ip} and port {port:d}')
140 def start_server(context, host_ip: Optional[str]=None, port: Optional[int]=None):
141     # NFVbench server host IP and port number have been setup from environment variables (see
142     # environment.py:before_all()).   Here we allow to override them from feature files:
143     if host_ip is not None:
144         context.host_ip = host_ip
145     if port is not None:
146         context.port = port
147
148     try:
149         # check if API is already available
150         requests.get(
151             "http://{host_ip}:{port}/status".format(host_ip=context.host_ip, port=context.port))
152     except RequestException:
153         cmd = ["nfvbench", "-c", context.data['config'], "--server"]
154         if context.host_ip != "127.0.0.1":
155             cmd.append("--host")
156             cmd.append(context.host_ip)
157         if context.port != 7555:
158             cmd.append("--port")
159             cmd.append(str(context.port))
160
161         subprocess.Popen(cmd, stdout=DEVNULL, stderr=subprocess.STDOUT)
162
163     test_nfvbench_api(context)
164
165
166 """Then steps."""
167
168
169 @then('run is started and waiting for result')
170 @then('{repeat:d} runs are started and waiting for maximum result')
171 def run_nfvbench_traffic(context, repeat=1):
172     context.logger.info(f"run_nfvbench_traffic: fs={context.json['frame_sizes'][0]} "
173                         f"fc={context.json['flow_count']} "
174                         f"rate={context.json['rate']} repeat={repeat}")
175
176     results = []
177     if 'json' not in context.json:
178         context.json['json'] = '/var/lib/xtesting/results/' + context.CASE_NAME + \
179                                '/nfvbench-' + context.tag + '-fs_' + \
180                                context.json['frame_sizes'][0] + '-fc_' + \
181                                context.json['flow_count'] + '-rate_' + \
182                                context.json['rate'] + '.json'
183     json_base_name = context.json['json']
184     for i in range(repeat):
185         if repeat > 1:
186             context.json['json'] = json_base_name.strip('.json') + '-' + str(i) + '.json'
187
188         url = "http://{ip}:{port}/start_run".format(ip=context.host_ip, port=context.port)
189         payload = json.dumps(context.json)
190         r = requests.post(url, data=payload, headers={'Content-Type': 'application/json'})
191         context.request_id = json.loads(r.text)["request_id"]
192         assert r.status_code == 200
193         result = wait_result(context)
194         results.append(result)
195         assert result["status"] == STATUS_OK
196
197         # Log latest result:
198         total_tx_rate = extract_value(result, "total_tx_rate")
199         overall = extract_value(result, "overall")
200         avg_delay_usec = extract_value(overall, "avg_delay_usec")
201         context.logger.info(f"run_nfvbench_traffic: result #{i+1}: "
202                             f"total_tx_rate(pps)={total_tx_rate:,} "  # Add ',' thousand separator
203                             f"avg_latency_usec={round(avg_delay_usec)}")
204
205     # Keep only the result with the highest rate:
206     context.result = reduce(
207         lambda x, y: x if extract_value(x, "total_tx_rate") > extract_value(y,
208                                                                             "total_tx_rate") else y,
209         results)
210
211     total_tx_rate = extract_value(context.result, "total_tx_rate")
212     overall = extract_value(context.result, "overall")
213     avg_delay_usec = extract_value(overall, "avg_delay_usec")
214     # create a synthesis with offered pps and latency values
215     context.synthesis['total_tx_rate'] = total_tx_rate
216     context.synthesis['avg_delay_usec'] = avg_delay_usec
217
218     # Log max result only when we did two nfvbench runs or more:
219     if repeat > 1:
220         context.logger.info(f"run_nfvbench_traffic: max result: "
221                             f"total_tx_rate(pps)={total_tx_rate:,} "
222                             f"avg_latency_usec={round(avg_delay_usec)}")
223
224
225 @then('extract offered rate result')
226 def save_rate_result(context):
227     total_tx_rate = extract_value(context.result, "total_tx_rate")
228     context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']] = total_tx_rate
229
230
231 @then('verify throughput result is in same range as the previous result')
232 @then('verify throughput result is greater than {threshold} of the previous result')
233 def get_throughput_result_from_database(context, threshold='90%'):
234     last_result = get_last_result(context)
235
236     if last_result:
237         compare_throughput_values(context, last_result, threshold)
238
239
240 @then('verify latency result is in same range as the previous result')
241 @then('verify latency result is greater than {threshold} of the previous result')
242 def get_latency_result_from_database(context, threshold='90%'):
243     last_result = get_last_result(context)
244
245     if last_result:
246         compare_latency_values(context, last_result, threshold)
247
248
249 @then('verify latency result is lower than {max_avg_latency_usec:g} microseconds')
250 def check_latency_result_against_fixed_threshold(context, max_avg_latency_usec: float):
251     """Check latency result against a fixed threshold.
252
253     Check that the average latency measured during the current scenario run is
254     lower or equal to the provided fixed reference value.
255
256     Args:
257         context: The context data of the current scenario run.  It includes the
258             test results for that run.
259
260         max_avg_latency_usec: Reference value to be used as a threshold.  This
261             is a maximum average latency expressed in microseconds.
262
263     Raises:
264         AssertionError: The latency result is strictly greater than the reference value.
265
266     """
267     # Get the just measured average latency (a float):
268     new_avg_latency_usec = context.synthesis['avg_delay_usec']
269
270     # Compare measured value to reference:
271     if new_avg_latency_usec > max_avg_latency_usec:
272         raise AssertionError("Average latency higher than max threshold: "
273                              "{avg_latency} usec > {threshold} usec".format(
274                                  avg_latency=round(new_avg_latency_usec),
275                                  threshold=round(max_avg_latency_usec)))
276
277
278 @then(
279     'verify result is in [{min_reference_value}pps, {max_reference_value}pps] range for throughput')
280 def compare_throughput_pps_result_with_range_values(context, min_reference_value,
281                                                     max_reference_value):
282     context.unit = 'pps'
283     reference_values = [min_reference_value + 'pps', max_reference_value + 'pps']
284     throughput_comparison(context, reference_values=reference_values)
285
286
287 @then(
288     'verify result is in [{min_reference_value}bps, {max_reference_value}bps] range for throughput')
289 def compare_throughput_bps_result_with_range_values(context, min_reference_value,
290                                                     max_reference_value):
291     context.unit = 'bps'
292     reference_values = [min_reference_value + 'bps', max_reference_value + 'bps']
293     throughput_comparison(context, reference_values=reference_values)
294
295
296 @then('verify result is in {reference_values} range for latency')
297 def compare_result_with_range_values(context, reference_values):
298     latency_comparison(context, reference_values=reference_values)
299
300
301 @then('verify throughput result is in same range as the characterization result')
302 @then('verify throughput result is greater than {threshold} of the characterization result')
303 def get_characterization_throughput_result_from_database(context, threshold='90%'):
304     last_result = get_last_result(context, True)
305     if not last_result:
306         raise AssertionError("No characterization result found.")
307     compare_throughput_values(context, last_result, threshold)
308
309
310 @then('verify latency result is in same range as the characterization result')
311 @then('verify latency result is greater than {threshold} of the characterization result')
312 def get_characterization_latency_result_from_database(context, threshold='90%'):
313     last_result = get_last_result(context, True)
314     if not last_result:
315         raise AssertionError("No characterization result found.")
316     compare_latency_values(context, last_result, threshold)
317
318 @then('push result to database')
319 def push_result_database(context):
320     if context.tag == "latency":
321         # override input rate value with percentage one to avoid no match
322         # if pps is not accurate with previous one
323         context.json["rate"] = context.percentage_rate
324     json_result = {"synthesis": context.synthesis, "input": context.json, "output": context.result}
325
326     if context.tag not in context.results:
327         context.results[context.tag] = [json_result]
328     else:
329         context.results[context.tag].append(json_result)
330
331
332 """Utils methods."""
333
334
335 @retry(AssertionError, tries=24, delay=5.0, logger=None)
336 def test_nfvbench_api(context):
337     try:
338         r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
339         assert r.status_code == 200
340         assert json.loads(r.text)["error_message"] == "no pending NFVbench run"
341     except RequestException as exc:
342         raise AssertionError("Fail to access NFVbench API") from exc
343
344
345 @retry(AssertionError, tries=1000, delay=2.0, logger=None)
346 def wait_result(context):
347     r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
348     context.raw_result = r.text
349     result = json.loads(context.raw_result)
350     assert r.status_code == 200
351     assert result["status"] == STATUS_OK or result["status"] == STATUS_ERROR
352     return result
353
354
355 def percentage_previous_rate(context, rate):
356     previous_rate = context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']]
357
358     if rate.endswith('%'):
359         rate_percent = convert_percentage_str_to_float(rate)
360         return str(int(previous_rate * rate_percent)) + 'pps'
361     raise Exception('Unknown rate string format %s' % rate)
362
363
364 def convert_percentage_str_to_float(percentage):
365     float_percent = float(percentage.replace('%', '').strip())
366     if float_percent <= 0 or float_percent > 100.0:
367         raise Exception('%s is out of valid range (must be 1-100%%)' % percentage)
368     return float_percent / 100
369
370
371 def compare_throughput_values(context, last_result, threshold):
372     assert last_result["output"]["status"] == context.result["status"]
373     if last_result["output"]["status"] == "OK":
374         old_throughput = extract_value(last_result["output"], "total_tx_rate")
375         throughput_comparison(context, old_throughput, threshold=threshold)
376
377
378 def compare_latency_values(context, last_result, threshold):
379     assert last_result["output"]["status"] == context.result["status"]
380     if last_result["output"]["status"] == "OK":
381         old_latency = extract_value(extract_value(last_result["output"], "overall"),
382                                     "avg_delay_usec")
383         latency_comparison(context, old_latency, threshold=threshold)
384
385
386 def throughput_comparison(context, old_throughput_pps=None, threshold=None, reference_values=None):
387     current_throughput_pps = extract_value(context.result, "total_tx_rate")
388
389     if old_throughput_pps:
390         if not current_throughput_pps >= convert_percentage_str_to_float(
391                 threshold) * old_throughput_pps:
392             raise AssertionError(
393                 "Current run throughput {current_throughput_pps} is not over {threshold} "
394                 " of previous value ({old_throughput_pps})".format(
395                     current_throughput_pps=Formatter.suffix('pps')(
396                         Formatter.standard(current_throughput_pps)),
397                     threshold=threshold, old_throughput_pps=Formatter.suffix('pps')(
398                         Formatter.standard(old_throughput_pps))))
399     elif reference_values:
400         if context.unit == 'bps':
401             current_throughput = extract_value(context.result, "offered_tx_rate_bps")
402             reference_values = [int(parse_rate_str(x)['rate_bps']) for x in reference_values]
403             formatted_current_throughput = Formatter.bits(current_throughput)
404             formatted_min_reference_value = Formatter.bits(reference_values[0])
405             formatted_max_reference_value = Formatter.bits(reference_values[1])
406         else:
407             current_throughput = current_throughput_pps
408             reference_values = [int(parse_rate_str(x)['rate_pps']) for x in reference_values]
409             formatted_current_throughput = Formatter.suffix('pps')(
410                 Formatter.standard(current_throughput))
411             formatted_min_reference_value = Formatter.suffix('pps')(
412                 Formatter.standard(reference_values[0]))
413             formatted_max_reference_value = Formatter.suffix('pps')(
414                 Formatter.standard(reference_values[1]))
415         if not reference_values[0] <= int(current_throughput) <= reference_values[1]:
416             raise AssertionError(
417                 "Current run throughput {current_throughput} is not in reference values "
418                 "[{min_reference_value}, {max_reference_value}]".format(
419                     current_throughput=formatted_current_throughput,
420                     min_reference_value=formatted_min_reference_value,
421                     max_reference_value=formatted_max_reference_value))
422
423
424 def latency_comparison(context, old_latency=None, threshold=None, reference_values=None):
425     overall = extract_value(context.result, "overall")
426     current_latency = extract_value(overall, "avg_delay_usec")
427
428     if old_latency:
429         if not current_latency <= (2 - convert_percentage_str_to_float(threshold)) * old_latency:
430             threshold = str(200 - int(threshold.strip('%'))) + '%'
431             raise AssertionError(
432                 "Current run latency {current_latency}usec is not less than {threshold} of "
433                 "previous value ({old_latency}usec)".format(
434                     current_latency=Formatter.standard(current_latency), threshold=threshold,
435                     old_latency=Formatter.standard(old_latency)))
436     elif reference_values:
437         if not reference_values[0] <= current_latency <= reference_values[1]:
438             raise AssertionError(
439                 "Current run latency {current_latency}usec is not in reference values "
440                 "[{min_reference_value}, {max_reference_value}]".format(
441                     current_latency=Formatter.standard(current_latency),
442                     min_reference_value=Formatter.standard(reference_values[0]),
443                     max_reference_value=Formatter.standard(reference_values[1])))
444
445
446 def get_result_from_input_values(input, result):
447     # Select required keys (other keys can be not set or unconsistent between scenarios)
448     required_keys = ['duration_sec', 'frame_sizes', 'flow_count', 'rate']
449     if 'user_label' in result:
450         required_keys.append('user_label')
451     if 'flavor_type' in result:
452         required_keys.append('flavor_type')
453     subset_input = dict((k, input[k]) for k in required_keys if k in input)
454     subset_result = dict((k, result[k]) for k in required_keys if k in result)
455     return subset_input == subset_result
456
457
458 def extract_value(obj, key):
459     """Pull all values of specified key from nested JSON."""
460     arr = []
461
462     def extract(obj, arr, key):
463         """Recursively search for values of key in JSON tree."""
464         if isinstance(obj, dict):
465             for k, v in obj.items():
466                 if k == key:
467                     arr.append(v)
468                 elif isinstance(v, (dict, list)):
469                     extract(v, arr, key)
470         elif isinstance(obj, list):
471             for item in obj:
472                 extract(item, arr, key)
473         return arr
474
475     results = extract(obj, arr, key)
476     return results[0]
477
478
479 def get_last_result(context, reference=None, page=None):
480     if reference:
481         case_name = 'characterization'
482     else:
483         case_name = context.CASE_NAME
484     url = context.data['TEST_DB_URL'] + '?project={project_name}&case={case_name}'.format(
485         project_name=context.data['PROJECT_NAME'], case_name=case_name)
486     if context.data['INSTALLER_TYPE']:
487         url += '&installer={installer_name}'.format(installer_name=context.data['INSTALLER_TYPE'])
488     if context.data['NODE_NAME']:
489         url += '&pod={pod_name}'.format(pod_name=context.data['NODE_NAME'])
490     url += '&criteria=PASS'
491     if page:
492         url += '&page={page}'.format(page=page)
493     last_results = requests.get(url)
494     assert last_results.status_code == 200
495     last_results = json.loads(last_results.text)
496     for result in last_results["results"]:
497         for tagged_result in result["details"]["results"][context.tag]:
498             if get_result_from_input_values(tagged_result["input"], context.json):
499                 return tagged_result
500     if last_results["pagination"]["current_page"] < last_results["pagination"]["total_pages"]:
501         page = last_results["pagination"]["current_page"] + 1
502         return get_last_result(context, page)
503     return None