# under the License.
#
-from functools import reduce
-
from behave import given
from behave import when
from behave import then
+from copy import deepcopy
from requests import RequestException
from retry import retry
import json
import requests
import subprocess
from subprocess import DEVNULL
+from typing import Optional
from nfvbench.summarizer import Formatter
from nfvbench.traffic_gen.traffic_utils import parse_rate_str
+from behave_tests.features.steps.testapi import TestapiClient, nfvbench_input_to_str
+
+
STATUS_ERROR = "ERROR"
STATUS_OK = "OK"
context.data['PROJECT_NAME'] = project_name
-@given('TEST_DB_EXT_URL: {test_db_ext_url}')
-def override_xtesting_test_db_ext_url(context, test_db_ext_url):
- context.data['TEST_DB_EXT_URL'] = test_db_ext_url
-
-
@given('TEST_DB_URL: {test_db_url}')
def override_xtesting_test_db_url(context, test_db_url):
context.data['TEST_DB_URL'] = test_db_url
context.percentage_rate = percentage_rate
rate = percentage_previous_rate(context, percentage_rate)
context.json['rate'] = rate
+ context.logger.info(f"add_percentage_rate: {percentage_rate} => rate={rate}")
+
+
+@given('packet rate equal to {percentage} of max throughput of last characterization')
+def add_packet_rate(context, percentage: str):
+ """Update nfvbench run config with packet rate based on reference value.
+
+ For the already configured frame size and flow count, retrieve the max
+ throughput obtained during the latest successful characterization run. Then
+ retain `percentage` of this value for the packet rate and update `context`.
+
+ Args:
+ context: The context data of the current scenario run. It includes the
+ testapi endpoints to retrieve the reference values.
+
+ percentage: String representation of the percentage of the reference max
+ throughput. Example: "70%"
+
+ Updates context:
+ context.percentage_rate: percentage of reference max throughput
+ using a string representation. Example: "70%"
+
+ context.json['rate']: packet rate in packets per second using a string
+ representation. Example: "2000pps"
+
+ Raises:
+ ValueError: invalid percentage string
+
+ AssertionError: cannot find reference throughput value
+
+ """
+ # Validate percentage
+ if not percentage.endswith('%'):
+ raise ValueError('Invalid percentage string: "{0}"'.format(percentage))
+ percentage_float = convert_percentage_str_to_float(percentage)
+
+ # Retrieve nfvbench results report from testapi for:
+ # - the latest throughput scenario inside a characterization feature that passed
+ # - the test duration, frame size and flow count given in context.json
+ # - (optionally) the user_label and flavor_type given in context.json
+ # - the 'ndr' rate
+ testapi_params = {"project_name": context.data['PROJECT_NAME'],
+ "case_name": "characterization"}
+ nfvbench_test_conditions = deepcopy(context.json)
+ nfvbench_test_conditions['rate'] = 'ndr'
+ testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
+ last_result = testapi_client.find_last_result(testapi_params,
+ scenario_tag="throughput",
+ nfvbench_test_input=nfvbench_test_conditions)
+ if last_result is None:
+ error_msg = "No characterization result found for scenario_tag=throughput"
+ error_msg += " and nfvbench test conditions "
+ error_msg += nfvbench_input_to_str(nfvbench_test_conditions)
+ context.logger.error(error_msg)
+ raise AssertionError(error_msg)
+
+ # From the results report, extract the max throughput in packets per second
+ total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
+ context.logger.info("add_packet_rate: max throughput of last characterization (pps): "
+ f"{total_tx_rate:,}")
+
+ # Compute the desired packet rate
+ rate = round(total_tx_rate * percentage_float)
+ context.logger.info(f"add_packet_rate: percentage={percentage} rate(pps)={rate:,}")
+
+ # Build rate string using a representation understood by nfvbench
+ rate_str = str(rate) + "pps"
+
+ # Update context
+ context.percentage_rate = percentage
+ context.json['rate'] = rate_str
"""When steps."""
@when('NFVbench API is ready')
@when('NFVbench API is ready on host {host_ip}')
@when('NFVbench API is ready on host {host_ip} and port {port:d}')
-def start_server(context, host_ip="127.0.0.1", port=7555):
- context.host_ip = host_ip
- context.port = port
+def start_server(context, host_ip: Optional[str]=None, port: Optional[int]=None):
+ """Start nfvbench server if needed and wait until it is ready.
+
+ Quickly check whether nfvbench HTTP server is ready by reading the "/status"
+ page. If not, start the server locally. Then wait until nfvbench API is
+ ready by polling the "/status" page.
+
+ This code is useful when behave and nfvbench run on the same machine. In
+ particular, it is needed to run behave tests with nfvbench Docker container.
+
+ There is currently no way to prevent behave from starting automatically
+ nfvbench server when this is not desirable, for instance when behave is
+ started using ansible-role-nfvbench. The user or the orchestration layer
+ should make sure nfvbench API is ready before starting behave tests.
+
+ """
+ # NFVbench server host IP and port number have been setup from environment variables (see
+ # environment.py:before_all()). Here we allow to override them from feature files:
+ if host_ip is not None:
+ context.host_ip = host_ip
+ if port is not None:
+ context.port = port
+
+ nfvbench_test_url = "http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port)
+ context.logger.info("start_server: test nfvbench API on URL: " + nfvbench_test_url)
+
try:
# check if API is already available
- requests.get(
- "http://{host_ip}:{port}/status".format(host_ip=context.host_ip, port=context.port))
+ requests.get(nfvbench_test_url)
except RequestException:
+ context.logger.info("nfvbench server not running")
+
cmd = ["nfvbench", "-c", context.data['config'], "--server"]
- if host_ip != "127.0.0.1":
+ if context.host_ip != "127.0.0.1":
cmd.append("--host")
- cmd.append(host_ip)
- if port != 7555:
+ cmd.append(context.host_ip)
+ if context.port != 7555:
cmd.append("--port")
- cmd.append(port)
+ cmd.append(str(context.port))
+
+ context.logger.info("Start nfvbench server with command: " + " ".join(cmd))
subprocess.Popen(cmd, stdout=DEVNULL, stderr=subprocess.STDOUT)
- test_nfvbench_api(context)
+ # Wait until nfvbench API is ready
+ test_nfvbench_api(nfvbench_test_url)
"""Then steps."""
@then('run is started and waiting for result')
@then('{repeat:d} runs are started and waiting for maximum result')
-def step_impl(context, repeat=1):
- results = []
+def run_nfvbench_traffic(context, repeat=1):
+ context.logger.info(f"run_nfvbench_traffic: fs={context.json['frame_sizes'][0]} "
+ f"fc={context.json['flow_count']} "
+ f"rate={context.json['rate']} repeat={repeat}")
+
if 'json' not in context.json:
+ # Build filename for nfvbench results in JSON format
context.json['json'] = '/var/lib/xtesting/results/' + context.CASE_NAME + \
- '/nfvbench-' + context.tag + '-fs_' + \
- context.json['frame_sizes'][0] + '-fc_' + \
- context.json['flow_count'] + '-rate_' + \
- context.json['rate'] + '.json'
+ '/nfvbench-' + context.tag + \
+ '-fs_' + context.json['frame_sizes'][0] + \
+ '-fc_' + context.json['flow_count']
+ if context.percentage_rate is not None:
+ # Add rate as a percentage, eg '-rate_70%'
+ context.json['json'] += '-rate_' + context.percentage_rate
+ else:
+ # Add rate in bits or packets per second, eg '-rate_15Gbps' or '-rate_10kpps'
+ context.json['json'] += '-rate_' + context.json['rate']
+ context.json['json'] += '.json'
+
json_base_name = context.json['json']
+
+ max_total_tx_rate = None
+ # rem: don't init with 0 in case nfvbench gets crazy and returns a negative packet rate
+
for i in range(repeat):
if repeat > 1:
context.json['json'] = json_base_name.strip('.json') + '-' + str(i) + '.json'
+ # Start nfvbench traffic and wait result:
url = "http://{ip}:{port}/start_run".format(ip=context.host_ip, port=context.port)
payload = json.dumps(context.json)
r = requests.post(url, data=payload, headers={'Content-Type': 'application/json'})
context.request_id = json.loads(r.text)["request_id"]
assert r.status_code == 200
result = wait_result(context)
- results.append(result)
assert result["status"] == STATUS_OK
+ # Extract useful metrics from result:
+ total_tx_rate = extract_value(result, "total_tx_rate")
+ overall = extract_value(result, "overall")
+ avg_delay_usec = extract_value(overall, "avg_delay_usec")
- context.result = reduce(
- lambda x, y: x if extract_value(x, "total_tx_rate") > extract_value(y,
- "total_tx_rate") else y,
- results)
+ # Log latest result:
+ context.logger.info(f"run_nfvbench_traffic: result #{i+1}: "
+ f"total_tx_rate(pps)={total_tx_rate:,} " # Add ',' thousand separator
+ f"avg_latency_usec={round(avg_delay_usec)}")
- total_tx_rate = extract_value(context.result, "total_tx_rate")
- context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']] = total_tx_rate
- overall = extract_value(context.result, "overall")
- avg_delay_usec = extract_value(overall, "avg_delay_usec")
- # create a synthesis with offered pps and latency values
- context.synthesis['total_tx_rate'] = total_tx_rate
- context.synthesis['avg_delay_usec'] = avg_delay_usec
+ # Keep only the result with the highest packet rate:
+ if max_total_tx_rate is None or total_tx_rate > max_total_tx_rate:
+ max_total_tx_rate = total_tx_rate
+ context.result = result
+ context.synthesis['total_tx_rate'] = total_tx_rate
+ context.synthesis['avg_delay_usec'] = avg_delay_usec
+
+ # Log max result only when we did two nfvbench runs or more:
+ if repeat > 1:
+ context.logger.info(f"run_nfvbench_traffic: max result: "
+ f"total_tx_rate(pps)={context.synthesis['total_tx_rate']:,} "
+ f"avg_latency_usec={round(context.synthesis['avg_delay_usec'])}")
@then('extract offered rate result')
if last_result:
compare_latency_values(context, last_result, threshold)
+
+@then('verify latency result is lower than {max_avg_latency_usec:g} microseconds')
+def check_latency_result_against_fixed_threshold(context, max_avg_latency_usec: float):
+ """Check latency result against a fixed threshold.
+
+ Check that the average latency measured during the current scenario run is
+ lower or equal to the provided fixed reference value.
+
+ Args:
+ context: The context data of the current scenario run. It includes the
+ test results for that run.
+
+ max_avg_latency_usec: Reference value to be used as a threshold. This
+ is a maximum average latency expressed in microseconds.
+
+ Raises:
+ AssertionError: The latency result is strictly greater than the reference value.
+
+ """
+ # Get the just measured average latency (a float):
+ new_avg_latency_usec = context.synthesis['avg_delay_usec']
+
+ # Log what we test:
+ context.logger.info("check_latency_result_against_fixed_threshold(usec): "
+ "{value}<={ref}?".format(
+ value=round(new_avg_latency_usec),
+ ref=round(max_avg_latency_usec)))
+
+ # Compare measured value to reference:
+ if new_avg_latency_usec > max_avg_latency_usec:
+ raise AssertionError("Average latency higher than max threshold: "
+ "{value} usec > {ref} usec".format(
+ value=round(new_avg_latency_usec),
+ ref=round(max_avg_latency_usec)))
+
+
@then(
'verify result is in [{min_reference_value}pps, {max_reference_value}pps] range for throughput')
def compare_throughput_pps_result_with_range_values(context, min_reference_value,
@retry(AssertionError, tries=24, delay=5.0, logger=None)
-def test_nfvbench_api(context):
+def test_nfvbench_api(nfvbench_test_url: str):
try:
- r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
+ r = requests.get(nfvbench_test_url)
assert r.status_code == 200
assert json.loads(r.text)["error_message"] == "no pending NFVbench run"
except RequestException as exc:
threshold) * old_throughput_pps:
raise AssertionError(
"Current run throughput {current_throughput_pps} is not over {threshold} "
- " of previous value ({old_throughput_pps}pps)".format(
+ " of previous value ({old_throughput_pps})".format(
current_throughput_pps=Formatter.suffix('pps')(
Formatter.standard(current_throughput_pps)),
threshold=threshold, old_throughput_pps=Formatter.suffix('pps')(
max_reference_value=Formatter.standard(reference_values[1])))
-def get_result_from_input_values(input, result):
- # Select required keys (other keys can be not set or unconsistent between scenarios)
- required_keys = ['duration_sec', 'frame_sizes', 'flow_count', 'rate']
- if 'user_label' in result:
- required_keys.append('user_label')
- if 'flavor_type' in result:
- required_keys.append('flavor_type')
- subset_input = dict((k, input[k]) for k in required_keys if k in input)
- subset_result = dict((k, result[k]) for k in required_keys if k in result)
- return subset_input == subset_result
-
-
def extract_value(obj, key):
"""Pull all values of specified key from nested JSON."""
arr = []
return results[0]
-def get_last_result(context, reference=None, page=None):
+def get_last_result(context, reference: bool = False):
+ """Look for a previous result in TestAPI database.
+
+ Search TestAPI results from newest to oldest and return the first result
+ record matching the context constraints. Log an overview of the results
+ found (max rate pps, avg delay usec, test conditions, date of measurement).
+
+ The result record test case must match the current test case
+ ('characterization' or 'non-regression') unless `reference` is set to True.
+
+ The result record scenario tag must match the current scenario tag
+ ('throughput' or 'latency').
+
+ Args:
+ context: behave context including project name, test case name, traffic
+ configuration (frame size, flow count, test duration), type of the
+ compute node under test (via loop VM flavor_type) and platform (via
+ user_label).
+
+ reference: when True, look for results with the 'characterization' test
+ case name instead of the current test case name.
+
+ Returns:
+ a JSON dictionary with the results, ie a dict with the keys "input",
+ "output" and "synthesis" when the scenario tag is 'throughput' or
+ 'latency'
+ """
if reference:
case_name = 'characterization'
else:
case_name = context.CASE_NAME
- url = context.data['TEST_DB_URL'] + '?project={project_name}&case={case_name}'.format(
- project_name=context.data['PROJECT_NAME'], case_name=case_name)
- if context.data['INSTALLER_TYPE']:
- url += '&installer={installer_name}'.format(installer_name=context.data['INSTALLER_TYPE'])
- if context.data['NODE_NAME']:
- url += '&pod={pod_name}'.format(pod_name=context.data['NODE_NAME'])
- url += '&criteria=PASS'
- if page:
- url += '&page={page}'.format(page=page)
- last_results = requests.get(url)
- assert last_results.status_code == 200
- last_results = json.loads(last_results.text)
- for result in last_results["results"]:
- for tagged_result in result["details"]["results"][context.tag]:
- if get_result_from_input_values(tagged_result["input"], context.json):
- return tagged_result
- if last_results["pagination"]["current_page"] < last_results["pagination"]["total_pages"]:
- page = last_results["pagination"]["current_page"] + 1
- return get_last_result(context, page)
- return None
+ testapi_params = {"project_name": context.data['PROJECT_NAME'],
+ "case_name": case_name}
+ testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
+ last_result = testapi_client.find_last_result(testapi_params,
+ scenario_tag=context.tag,
+ nfvbench_test_input=context.json)
+ if last_result is None:
+ error_msg = "get_last_result: No result found in TestAPI database:"
+ error_msg += f" case_name={case_name} scenario_tag={context.tag} "
+ error_msg += nfvbench_input_to_str(context.json)
+ context.logger.error(error_msg)
+ raise AssertionError(error_msg)
+
+ # Log an overview of the last result (latency and max throughput)
+ measurement_date = last_result["output"]["result"]["date"]
+ total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
+ avg_delay_usec = extract_value(extract_value(last_result["output"], "overall"),
+ "avg_delay_usec")
+ context.logger.info(f"get_last_result: case_name={case_name} scenario_tag={context.tag}"
+ f' measurement_date="{measurement_date}"'
+ f" total_tx_rate(pps)={total_tx_rate:,}"
+ f" avg_latency_usec={round(avg_delay_usec)}")
+
+ return last_result