behave_tests: refactor TestAPI DB lookup
[nfvbench.git] / behave_tests / features / steps / steps.py
1 #!/usr/bin/env python
2 # Copyright 2021 Orange
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 from behave import given
18 from behave import when
19 from behave import then
20 from copy import deepcopy
21 from requests import RequestException
22 from retry import retry
23 import json
24 import requests
25 import subprocess
26 from subprocess import DEVNULL
27 from typing import Optional
28
29 from nfvbench.summarizer import Formatter
30 from nfvbench.traffic_gen.traffic_utils import parse_rate_str
31
32 from behave_tests.features.steps.testapi import TestapiClient, nfvbench_input_to_str
33
34
35 STATUS_ERROR = "ERROR"
36
37 STATUS_OK = "OK"
38
39
40 """Given steps."""
41
42
43 @given('PROJECT_NAME: {project_name}')
44 def override_xtesting_project_name(context, project_name):
45     context.data['PROJECT_NAME'] = project_name
46
47
48 @given('TEST_DB_URL: {test_db_url}')
49 def override_xtesting_test_db_url(context, test_db_url):
50     context.data['TEST_DB_URL'] = test_db_url
51     context.data['BASE_TEST_DB_URL'] = context.data['TEST_DB_URL'].replace('results', '')
52
53
54 @given('INSTALLER_TYPE: {installer_type}')
55 def override_xtesting_installer_type(context, installer_type):
56     context.data['INSTALLER_TYPE'] = installer_type
57
58
59 @given('DEPLOY_SCENARIO: {deploy_scenario}')
60 def override_xtesting_deploy_scenario(context, deploy_scenario):
61     context.data['DEPLOY_SCENARIO'] = deploy_scenario
62
63
64 @given('NODE_NAME: {node_name}')
65 def override_xtesting_node_name(context, node_name):
66     context.data['NODE_NAME'] = node_name
67
68
69 @given('BUILD_TAG: {build_tag}')
70 def override_xtesting_build_tag(context, build_tag):
71     context.data['BUILD_TAG'] = build_tag
72
73
74 @given('NFVbench config from file: {config_path}')
75 def init_config(context, config_path):
76     context.data['config'] = config_path
77
78
79 @given('a JSON NFVbench config')
80 def init_config_from_json(context):
81     context.json.update(json.loads(context.text))
82
83
84 @given('log file: {log_file_path}')
85 def log_config(context, log_file_path):
86     context.json['log_file'] = log_file_path
87
88
89 @given('json file: {json_file_path}')
90 def json_config(context, json_file_path):
91     context.json['json'] = json_file_path
92
93
94 @given('no clean up')
95 def add_no_clean_up_flag(context):
96     context.json['no_cleanup'] = 'true'
97
98
99 @given('TRex is restarted')
100 def add_restart(context):
101     context.json['restart'] = 'true'
102
103
104 @given('{label} label')
105 def add_label(context, label):
106     context.json['label'] = label
107
108
109 @given('{frame_size} frame size')
110 def add_frame_size(context, frame_size):
111     context.json['frame_sizes'] = [frame_size]
112
113
114 @given('{flow_count} flow count')
115 def add_flow_count(context, flow_count):
116     context.json['flow_count'] = flow_count
117
118
119 @given('{rate} rate')
120 def add_rate(context, rate):
121     context.json['rate'] = rate
122
123
124 @given('{duration} sec run duration')
125 def add_duration(context, duration):
126     context.json['duration_sec'] = duration
127
128
129 @given('{percentage_rate} rate of previous scenario')
130 def add_percentage_rate(context, percentage_rate):
131     context.percentage_rate = percentage_rate
132     rate = percentage_previous_rate(context, percentage_rate)
133     context.json['rate'] = rate
134     context.logger.info(f"add_percentage_rate: {percentage_rate} => rate={rate}")
135
136
137 @given('packet rate equal to {percentage} of max throughput of last characterization')
138 def add_packet_rate(context, percentage: str):
139     """Update nfvbench run config with packet rate based on reference value.
140
141     For the already configured frame size and flow count, retrieve the max
142     throughput obtained during the latest successful characterization run.  Then
143     retain `percentage` of this value for the packet rate and update `context`.
144
145     Args:
146         context: The context data of the current scenario run.  It includes the
147             testapi endpoints to retrieve the reference values.
148
149         percentage: String representation of the percentage of the reference max
150             throughput.  Example: "70%"
151
152     Updates context:
153         context.percentage_rate: percentage of reference max throughput
154             using a string representation. Example: "70%"
155
156         context.json['rate']: packet rate in packets per second using a string
157             representation.  Example: "2000pps"
158
159     Raises:
160         ValueError: invalid percentage string
161
162         AssertionError: cannot find reference throughput value
163
164     """
165     # Validate percentage
166     if not percentage.endswith('%'):
167         raise ValueError('Invalid percentage string: "{0}"'.format(percentage))
168     percentage_float = convert_percentage_str_to_float(percentage)
169
170     # Retrieve nfvbench results report from testapi for:
171     # - the latest throughput scenario inside a characterization feature that passed
172     # - the test duration, frame size and flow count given in context.json
173     # - (optionally) the user_label and flavor_type given in context.json
174     # - the 'ndr' rate
175     testapi_params = {"project_name": context.data['PROJECT_NAME'],
176                       "case_name": "characterization"}
177     nfvbench_test_conditions = deepcopy(context.json)
178     nfvbench_test_conditions['rate'] = 'ndr'
179     testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
180     last_result = testapi_client.find_last_result(testapi_params,
181                                                   scenario_tag="throughput",
182                                                   nfvbench_test_input=nfvbench_test_conditions)
183     if last_result is None:
184         error_msg = "No characterization result found for scenario_tag=throughput"
185         error_msg += " and nfvbench test conditions "
186         error_msg += nfvbench_input_to_str(nfvbench_test_conditions)
187         context.logger.error(error_msg)
188         raise AssertionError(error_msg)
189
190     # From the results report, extract the max throughput in packets per second
191     total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
192     context.logger.info("add_packet_rate: max throughput of last characterization (pps): "
193                         f"{total_tx_rate:,}")
194
195     # Compute the desired packet rate
196     rate = round(total_tx_rate * percentage_float)
197     context.logger.info(f"add_packet_rate: percentage={percentage} rate(pps)={rate:,}")
198
199     # Build rate string using a representation understood by nfvbench
200     rate_str = str(rate) + "pps"
201
202     # Update context
203     context.percentage_rate = percentage
204     context.json['rate'] = rate_str
205
206
207 """When steps."""
208
209
210 @when('NFVbench API is ready')
211 @when('NFVbench API is ready on host {host_ip}')
212 @when('NFVbench API is ready on host {host_ip} and port {port:d}')
213 def start_server(context, host_ip: Optional[str]=None, port: Optional[int]=None):
214     """Start nfvbench server if needed and wait until it is ready.
215
216     Quickly check whether nfvbench HTTP server is ready by reading the "/status"
217     page.  If not, start the server locally.  Then wait until nfvbench API is
218     ready by polling the "/status" page.
219
220     This code is useful when behave and nfvbench run on the same machine.  In
221     particular, it is needed to run behave tests with nfvbench Docker container.
222
223     There is currently no way to prevent behave from starting automatically
224     nfvbench server when this is not desirable, for instance when behave is
225     started using ansible-role-nfvbench.  The user or the orchestration layer
226     should make sure nfvbench API is ready before starting behave tests.
227
228     """
229     # NFVbench server host IP and port number have been setup from environment variables (see
230     # environment.py:before_all()).   Here we allow to override them from feature files:
231     if host_ip is not None:
232         context.host_ip = host_ip
233     if port is not None:
234         context.port = port
235
236     nfvbench_test_url = "http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port)
237     context.logger.info("start_server: test nfvbench API on URL: " + nfvbench_test_url)
238
239     try:
240         # check if API is already available
241         requests.get(nfvbench_test_url)
242     except RequestException:
243         context.logger.info("nfvbench server not running")
244
245         cmd = ["nfvbench", "-c", context.data['config'], "--server"]
246         if context.host_ip != "127.0.0.1":
247             cmd.append("--host")
248             cmd.append(context.host_ip)
249         if context.port != 7555:
250             cmd.append("--port")
251             cmd.append(str(context.port))
252
253         context.logger.info("Start nfvbench server with command: " + " ".join(cmd))
254
255         subprocess.Popen(cmd, stdout=DEVNULL, stderr=subprocess.STDOUT)
256
257     # Wait until nfvbench API is ready
258     test_nfvbench_api(nfvbench_test_url)
259
260
261 """Then steps."""
262
263
264 @then('run is started and waiting for result')
265 @then('{repeat:d} runs are started and waiting for maximum result')
266 def run_nfvbench_traffic(context, repeat=1):
267     context.logger.info(f"run_nfvbench_traffic: fs={context.json['frame_sizes'][0]} "
268                         f"fc={context.json['flow_count']} "
269                         f"rate={context.json['rate']} repeat={repeat}")
270
271     if 'json' not in context.json:
272         # Build filename for nfvbench results in JSON format
273         context.json['json'] = '/var/lib/xtesting/results/' + context.CASE_NAME + \
274                                '/nfvbench-' + context.tag + \
275                                '-fs_' + context.json['frame_sizes'][0] + \
276                                '-fc_' + context.json['flow_count']
277         if context.percentage_rate is not None:
278             # Add rate as a percentage, eg '-rate_70%'
279             context.json['json'] += '-rate_' + context.percentage_rate
280         else:
281             # Add rate in bits or packets per second, eg '-rate_15Gbps' or '-rate_10kpps'
282             context.json['json'] += '-rate_' + context.json['rate']
283         context.json['json'] += '.json'
284
285     json_base_name = context.json['json']
286
287     max_total_tx_rate = None
288     # rem: don't init with 0 in case nfvbench gets crazy and returns a negative packet rate
289
290     for i in range(repeat):
291         if repeat > 1:
292             context.json['json'] = json_base_name.strip('.json') + '-' + str(i) + '.json'
293
294         # Start nfvbench traffic and wait result:
295         url = "http://{ip}:{port}/start_run".format(ip=context.host_ip, port=context.port)
296         payload = json.dumps(context.json)
297         r = requests.post(url, data=payload, headers={'Content-Type': 'application/json'})
298         context.request_id = json.loads(r.text)["request_id"]
299         assert r.status_code == 200
300         result = wait_result(context)
301         assert result["status"] == STATUS_OK
302
303         # Extract useful metrics from result:
304         total_tx_rate = extract_value(result, "total_tx_rate")
305         overall = extract_value(result, "overall")
306         avg_delay_usec = extract_value(overall, "avg_delay_usec")
307
308         # Log latest result:
309         context.logger.info(f"run_nfvbench_traffic: result #{i+1}: "
310                             f"total_tx_rate(pps)={total_tx_rate:,} "  # Add ',' thousand separator
311                             f"avg_latency_usec={round(avg_delay_usec)}")
312
313         # Keep only the result with the highest packet rate:
314         if max_total_tx_rate is None or total_tx_rate > max_total_tx_rate:
315             max_total_tx_rate = total_tx_rate
316             context.result = result
317             context.synthesis['total_tx_rate'] = total_tx_rate
318             context.synthesis['avg_delay_usec'] = avg_delay_usec
319
320     # Log max result only when we did two nfvbench runs or more:
321     if repeat > 1:
322         context.logger.info(f"run_nfvbench_traffic: max result: "
323                             f"total_tx_rate(pps)={context.synthesis['total_tx_rate']:,} "
324                             f"avg_latency_usec={round(context.synthesis['avg_delay_usec'])}")
325
326
327 @then('extract offered rate result')
328 def save_rate_result(context):
329     total_tx_rate = extract_value(context.result, "total_tx_rate")
330     context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']] = total_tx_rate
331
332
333 @then('verify throughput result is in same range as the previous result')
334 @then('verify throughput result is greater than {threshold} of the previous result')
335 def get_throughput_result_from_database(context, threshold='90%'):
336     last_result = get_last_result(context)
337
338     if last_result:
339         compare_throughput_values(context, last_result, threshold)
340
341
342 @then('verify latency result is in same range as the previous result')
343 @then('verify latency result is greater than {threshold} of the previous result')
344 def get_latency_result_from_database(context, threshold='90%'):
345     last_result = get_last_result(context)
346
347     if last_result:
348         compare_latency_values(context, last_result, threshold)
349
350
351 @then('verify latency result is lower than {max_avg_latency_usec:g} microseconds')
352 def check_latency_result_against_fixed_threshold(context, max_avg_latency_usec: float):
353     """Check latency result against a fixed threshold.
354
355     Check that the average latency measured during the current scenario run is
356     lower or equal to the provided fixed reference value.
357
358     Args:
359         context: The context data of the current scenario run.  It includes the
360             test results for that run.
361
362         max_avg_latency_usec: Reference value to be used as a threshold.  This
363             is a maximum average latency expressed in microseconds.
364
365     Raises:
366         AssertionError: The latency result is strictly greater than the reference value.
367
368     """
369     # Get the just measured average latency (a float):
370     new_avg_latency_usec = context.synthesis['avg_delay_usec']
371
372     # Log what we test:
373     context.logger.info("check_latency_result_against_fixed_threshold(usec): "
374                         "{value}<={ref}?".format(
375                             value=round(new_avg_latency_usec),
376                             ref=round(max_avg_latency_usec)))
377
378     # Compare measured value to reference:
379     if new_avg_latency_usec > max_avg_latency_usec:
380         raise AssertionError("Average latency higher than max threshold: "
381                              "{value} usec > {ref} usec".format(
382                                  value=round(new_avg_latency_usec),
383                                  ref=round(max_avg_latency_usec)))
384
385
386 @then(
387     'verify result is in [{min_reference_value}pps, {max_reference_value}pps] range for throughput')
388 def compare_throughput_pps_result_with_range_values(context, min_reference_value,
389                                                     max_reference_value):
390     context.unit = 'pps'
391     reference_values = [min_reference_value + 'pps', max_reference_value + 'pps']
392     throughput_comparison(context, reference_values=reference_values)
393
394
395 @then(
396     'verify result is in [{min_reference_value}bps, {max_reference_value}bps] range for throughput')
397 def compare_throughput_bps_result_with_range_values(context, min_reference_value,
398                                                     max_reference_value):
399     context.unit = 'bps'
400     reference_values = [min_reference_value + 'bps', max_reference_value + 'bps']
401     throughput_comparison(context, reference_values=reference_values)
402
403
404 @then('verify result is in {reference_values} range for latency')
405 def compare_result_with_range_values(context, reference_values):
406     latency_comparison(context, reference_values=reference_values)
407
408
409 @then('verify throughput result is in same range as the characterization result')
410 @then('verify throughput result is greater than {threshold} of the characterization result')
411 def get_characterization_throughput_result_from_database(context, threshold='90%'):
412     last_result = get_last_result(context, True)
413     if not last_result:
414         raise AssertionError("No characterization result found.")
415     compare_throughput_values(context, last_result, threshold)
416
417
418 @then('verify latency result is in same range as the characterization result')
419 @then('verify latency result is greater than {threshold} of the characterization result')
420 def get_characterization_latency_result_from_database(context, threshold='90%'):
421     last_result = get_last_result(context, True)
422     if not last_result:
423         raise AssertionError("No characterization result found.")
424     compare_latency_values(context, last_result, threshold)
425
426 @then('push result to database')
427 def push_result_database(context):
428     if context.tag == "latency":
429         # override input rate value with percentage one to avoid no match
430         # if pps is not accurate with previous one
431         context.json["rate"] = context.percentage_rate
432     json_result = {"synthesis": context.synthesis, "input": context.json, "output": context.result}
433
434     if context.tag not in context.results:
435         context.results[context.tag] = [json_result]
436     else:
437         context.results[context.tag].append(json_result)
438
439
440 """Utils methods."""
441
442
443 @retry(AssertionError, tries=24, delay=5.0, logger=None)
444 def test_nfvbench_api(nfvbench_test_url: str):
445     try:
446         r = requests.get(nfvbench_test_url)
447         assert r.status_code == 200
448         assert json.loads(r.text)["error_message"] == "no pending NFVbench run"
449     except RequestException as exc:
450         raise AssertionError("Fail to access NFVbench API") from exc
451
452
453 @retry(AssertionError, tries=1000, delay=2.0, logger=None)
454 def wait_result(context):
455     r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
456     context.raw_result = r.text
457     result = json.loads(context.raw_result)
458     assert r.status_code == 200
459     assert result["status"] == STATUS_OK or result["status"] == STATUS_ERROR
460     return result
461
462
463 def percentage_previous_rate(context, rate):
464     previous_rate = context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']]
465
466     if rate.endswith('%'):
467         rate_percent = convert_percentage_str_to_float(rate)
468         return str(int(previous_rate * rate_percent)) + 'pps'
469     raise Exception('Unknown rate string format %s' % rate)
470
471
472 def convert_percentage_str_to_float(percentage):
473     float_percent = float(percentage.replace('%', '').strip())
474     if float_percent <= 0 or float_percent > 100.0:
475         raise Exception('%s is out of valid range (must be 1-100%%)' % percentage)
476     return float_percent / 100
477
478
479 def compare_throughput_values(context, last_result, threshold):
480     assert last_result["output"]["status"] == context.result["status"]
481     if last_result["output"]["status"] == "OK":
482         old_throughput = extract_value(last_result["output"], "total_tx_rate")
483         throughput_comparison(context, old_throughput, threshold=threshold)
484
485
486 def compare_latency_values(context, last_result, threshold):
487     assert last_result["output"]["status"] == context.result["status"]
488     if last_result["output"]["status"] == "OK":
489         old_latency = extract_value(extract_value(last_result["output"], "overall"),
490                                     "avg_delay_usec")
491         latency_comparison(context, old_latency, threshold=threshold)
492
493
494 def throughput_comparison(context, old_throughput_pps=None, threshold=None, reference_values=None):
495     current_throughput_pps = extract_value(context.result, "total_tx_rate")
496
497     if old_throughput_pps:
498         if not current_throughput_pps >= convert_percentage_str_to_float(
499                 threshold) * old_throughput_pps:
500             raise AssertionError(
501                 "Current run throughput {current_throughput_pps} is not over {threshold} "
502                 " of previous value ({old_throughput_pps})".format(
503                     current_throughput_pps=Formatter.suffix('pps')(
504                         Formatter.standard(current_throughput_pps)),
505                     threshold=threshold, old_throughput_pps=Formatter.suffix('pps')(
506                         Formatter.standard(old_throughput_pps))))
507     elif reference_values:
508         if context.unit == 'bps':
509             current_throughput = extract_value(context.result, "offered_tx_rate_bps")
510             reference_values = [int(parse_rate_str(x)['rate_bps']) for x in reference_values]
511             formatted_current_throughput = Formatter.bits(current_throughput)
512             formatted_min_reference_value = Formatter.bits(reference_values[0])
513             formatted_max_reference_value = Formatter.bits(reference_values[1])
514         else:
515             current_throughput = current_throughput_pps
516             reference_values = [int(parse_rate_str(x)['rate_pps']) for x in reference_values]
517             formatted_current_throughput = Formatter.suffix('pps')(
518                 Formatter.standard(current_throughput))
519             formatted_min_reference_value = Formatter.suffix('pps')(
520                 Formatter.standard(reference_values[0]))
521             formatted_max_reference_value = Formatter.suffix('pps')(
522                 Formatter.standard(reference_values[1]))
523         if not reference_values[0] <= int(current_throughput) <= reference_values[1]:
524             raise AssertionError(
525                 "Current run throughput {current_throughput} is not in reference values "
526                 "[{min_reference_value}, {max_reference_value}]".format(
527                     current_throughput=formatted_current_throughput,
528                     min_reference_value=formatted_min_reference_value,
529                     max_reference_value=formatted_max_reference_value))
530
531
532 def latency_comparison(context, old_latency=None, threshold=None, reference_values=None):
533     overall = extract_value(context.result, "overall")
534     current_latency = extract_value(overall, "avg_delay_usec")
535
536     if old_latency:
537         if not current_latency <= (2 - convert_percentage_str_to_float(threshold)) * old_latency:
538             threshold = str(200 - int(threshold.strip('%'))) + '%'
539             raise AssertionError(
540                 "Current run latency {current_latency}usec is not less than {threshold} of "
541                 "previous value ({old_latency}usec)".format(
542                     current_latency=Formatter.standard(current_latency), threshold=threshold,
543                     old_latency=Formatter.standard(old_latency)))
544     elif reference_values:
545         if not reference_values[0] <= current_latency <= reference_values[1]:
546             raise AssertionError(
547                 "Current run latency {current_latency}usec is not in reference values "
548                 "[{min_reference_value}, {max_reference_value}]".format(
549                     current_latency=Formatter.standard(current_latency),
550                     min_reference_value=Formatter.standard(reference_values[0]),
551                     max_reference_value=Formatter.standard(reference_values[1])))
552
553
554 def extract_value(obj, key):
555     """Pull all values of specified key from nested JSON."""
556     arr = []
557
558     def extract(obj, arr, key):
559         """Recursively search for values of key in JSON tree."""
560         if isinstance(obj, dict):
561             for k, v in obj.items():
562                 if k == key:
563                     arr.append(v)
564                 elif isinstance(v, (dict, list)):
565                     extract(v, arr, key)
566         elif isinstance(obj, list):
567             for item in obj:
568                 extract(item, arr, key)
569         return arr
570
571     results = extract(obj, arr, key)
572     return results[0]
573
574
575 def get_last_result(context, reference: bool = False):
576     """Look for a previous result in TestAPI database.
577
578     Search TestAPI results from newest to oldest and return the first result
579     record matching the context constraints.  Log an overview of the results
580     found (max rate pps, avg delay usec, test conditions, date of measurement).
581
582     The result record test case must match the current test case
583     ('characterization' or 'non-regression') unless `reference` is set to True.
584
585     The result record scenario tag must match the current scenario tag
586     ('throughput' or 'latency').
587
588     Args:
589         context: behave context including project name, test case name, traffic
590             configuration (frame size, flow count, test duration), type of the
591             compute node under test (via loop VM flavor_type) and platform (via
592             user_label).
593
594         reference: when True, look for results with the 'characterization' test
595             case name instead of the current test case name.
596
597     Returns:
598         a JSON dictionary with the results, ie a dict with the keys "input",
599             "output" and "synthesis" when the scenario tag is 'throughput' or
600             'latency'
601     """
602     if reference:
603         case_name = 'characterization'
604     else:
605         case_name = context.CASE_NAME
606     testapi_params = {"project_name": context.data['PROJECT_NAME'],
607                       "case_name": case_name}
608     testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
609     last_result = testapi_client.find_last_result(testapi_params,
610                                                   scenario_tag=context.tag,
611                                                   nfvbench_test_input=context.json)
612     if last_result is None:
613         error_msg = "get_last_result: No result found in TestAPI database:"
614         error_msg += f" case_name={case_name} scenario_tag={context.tag} "
615         error_msg += nfvbench_input_to_str(context.json)
616         context.logger.error(error_msg)
617         raise AssertionError(error_msg)
618
619     # Log an overview of the last result (latency and max throughput)
620     measurement_date = last_result["output"]["result"]["date"]
621     total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
622     avg_delay_usec = extract_value(extract_value(last_result["output"], "overall"),
623                                    "avg_delay_usec")
624     context.logger.info(f"get_last_result: case_name={case_name} scenario_tag={context.tag}"
625                         f' measurement_date="{measurement_date}"'
626                         f" total_tx_rate(pps)={total_tx_rate:,}"
627                         f" avg_latency_usec={round(avg_delay_usec)}")
628
629     return last_result