# under the License.
#
-from functools import reduce
-
from behave import given
from behave import when
from behave import then
+from copy import deepcopy
from requests import RequestException
from retry import retry
import json
import requests
import subprocess
from subprocess import DEVNULL
+from typing import Optional
from nfvbench.summarizer import Formatter
from nfvbench.traffic_gen.traffic_utils import parse_rate_str
+from testapi import TestapiClient, nfvbench_input_to_str
+
+
STATUS_ERROR = "ERROR"
STATUS_OK = "OK"
context.data['PROJECT_NAME'] = project_name
-@given('TEST_DB_EXT_URL: {test_db_ext_url}')
-def override_xtesting_test_db_ext_url(context, test_db_ext_url):
- context.data['TEST_DB_EXT_URL'] = test_db_ext_url
-
-
@given('TEST_DB_URL: {test_db_url}')
def override_xtesting_test_db_url(context, test_db_url):
context.data['TEST_DB_URL'] = test_db_url
context.percentage_rate = percentage_rate
rate = percentage_previous_rate(context, percentage_rate)
context.json['rate'] = rate
+ context.logger.info(f"add_percentage_rate: {percentage_rate} => rate={rate}")
+
+
+@given('packet rate equal to {percentage} of max throughput of last characterization')
+def add_packet_rate(context, percentage: str):
+ """Update nfvbench run config with packet rate based on reference value.
+
+ For the already configured frame size and flow count, retrieve the max
+ throughput obtained during the latest successful characterization run. Then
+ retain `percentage` of this value for the packet rate and update `context`.
+
+ Args:
+ context: The context data of the current scenario run. It includes the
+ testapi endpoints to retrieve the reference values.
+
+ percentage: String representation of the percentage of the reference max
+ throughput. Example: "70%"
+
+ Updates context:
+ context.percentage_rate: percentage of reference max throughput
+ using a string representation. Example: "70%"
+
+ context.json['rate']: packet rate in packets per second using a string
+ representation. Example: "2000pps"
+
+ Raises:
+ ValueError: invalid percentage string
+
+ AssertionError: cannot find reference throughput value
+
+ """
+ # Validate percentage
+ if not percentage.endswith('%'):
+ raise ValueError('Invalid percentage string: "{0}"'.format(percentage))
+ percentage_float = convert_percentage_str_to_float(percentage)
+
+ # Retrieve nfvbench results report from testapi for:
+ # - the latest throughput scenario inside a characterization feature that passed
+ # - the test duration, frame size and flow count given in context.json
+ # - (optionally) the user_label and flavor_type given in context.json
+ # - the 'ndr' rate
+ testapi_params = {"project_name": context.data['PROJECT_NAME'],
+ "case_name": "characterization"}
+ nfvbench_test_conditions = deepcopy(context.json)
+ nfvbench_test_conditions['rate'] = 'ndr'
+ testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'],
+ logger=context.logger)
+ last_result = testapi_client.find_last_result(testapi_params,
+ scenario_tag="throughput",
+ nfvbench_test_input=nfvbench_test_conditions)
+ if last_result is None:
+ error_msg = "No characterization result found for scenario_tag=throughput"
+ error_msg += " and nfvbench test conditions "
+ error_msg += nfvbench_input_to_str(nfvbench_test_conditions)
+ raise AssertionError(error_msg)
+
+ # From the results report, extract the max throughput in packets per second
+ total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
+ context.logger.info("add_packet_rate: max throughput of last characterization (pps): "
+ f"{total_tx_rate:,}")
+
+ # Compute the desired packet rate
+ rate = round(total_tx_rate * percentage_float)
+ context.logger.info(f"add_packet_rate: percentage={percentage} rate(pps)={rate:,}")
+
+ # Build rate string using a representation understood by nfvbench
+ rate_str = str(rate) + "pps"
+
+ # Update context
+ context.percentage_rate = percentage
+ context.json['rate'] = rate_str
"""When steps."""
@when('NFVbench API is ready')
@when('NFVbench API is ready on host {host_ip}')
@when('NFVbench API is ready on host {host_ip} and port {port:d}')
-def start_server(context, host_ip="127.0.0.1", port=7555):
- context.host_ip = host_ip
- context.port = port
+def start_server(context, host_ip: Optional[str]=None, port: Optional[int]=None):
+ """Start nfvbench server if needed and wait until it is ready.
+
+ Quickly check whether nfvbench HTTP server is ready by reading the "/status"
+ page. If not, start the server locally. Then wait until nfvbench API is
+ ready by polling the "/status" page.
+
+ This code is useful when behave and nfvbench run on the same machine. In
+ particular, it is needed to run behave tests with nfvbench Docker container.
+
+ There is currently no way to prevent behave from starting automatically
+ nfvbench server when this is not desirable, for instance when behave is
+ started using ansible-role-nfvbench. The user or the orchestration layer
+ should make sure nfvbench API is ready before starting behave tests.
+
+ """
+ # NFVbench server host IP and port number have been setup from environment variables (see
+ # environment.py:before_all()). Here we allow to override them from feature files:
+ if host_ip is not None:
+ context.host_ip = host_ip
+ if port is not None:
+ context.port = port
+
+ nfvbench_test_url = "http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port)
+ context.logger.info("start_server: test nfvbench API on URL: " + nfvbench_test_url)
+
try:
# check if API is already available
- requests.get(
- "http://{host_ip}:{port}/status".format(host_ip=context.host_ip, port=context.port))
+ requests.get(nfvbench_test_url)
except RequestException:
+ context.logger.info("nfvbench server not running")
+
cmd = ["nfvbench", "-c", context.data['config'], "--server"]
- if host_ip != "127.0.0.1":
+ if context.host_ip != "127.0.0.1":
cmd.append("--host")
- cmd.append(host_ip)
- if port != 7555:
+ cmd.append(context.host_ip)
+ if context.port != 7555:
cmd.append("--port")
- cmd.append(port)
+ cmd.append(str(context.port))
+
+ context.logger.info("Start nfvbench server with command: " + " ".join(cmd))
subprocess.Popen(cmd, stdout=DEVNULL, stderr=subprocess.STDOUT)
- test_nfvbench_api(context)
+ # Wait until nfvbench API is ready
+ test_nfvbench_api(nfvbench_test_url)
"""Then steps."""
@then('run is started and waiting for result')
@then('{repeat:d} runs are started and waiting for maximum result')
-def step_impl(context, repeat=1):
- results = []
+def run_nfvbench_traffic(context, repeat=1):
+ context.logger.info(f"run_nfvbench_traffic: fs={context.json['frame_sizes'][0]} "
+ f"fc={context.json['flow_count']} "
+ f"rate={context.json['rate']} repeat={repeat}")
+
if 'json' not in context.json:
+ # Build filename for nfvbench results in JSON format
context.json['json'] = '/var/lib/xtesting/results/' + context.CASE_NAME + \
- '/nfvbench-' + context.tag + '-fs_' + \
- context.json['frame_sizes'][0] + '-fc_' + \
- context.json['flow_count'] + '-rate_' + \
- context.json['rate'] + '.json'
+ '/nfvbench-' + context.tag + \
+ '-fs_' + context.json['frame_sizes'][0] + \
+ '-fc_' + context.json['flow_count']
+ if context.percentage_rate is not None:
+ # Add rate as a percentage, eg '-rate_70%'
+ context.json['json'] += '-rate_' + context.percentage_rate
+ else:
+ # Add rate in bits or packets per second, eg '-rate_15Gbps' or '-rate_10kpps'
+ context.json['json'] += '-rate_' + context.json['rate']
+ context.json['json'] += '.json'
+
json_base_name = context.json['json']
+
+ max_total_tx_rate = None
+ # rem: don't init with 0 in case nfvbench gets crazy and returns a negative packet rate
+
for i in range(repeat):
if repeat > 1:
context.json['json'] = json_base_name.strip('.json') + '-' + str(i) + '.json'
+ # Start nfvbench traffic and wait result:
url = "http://{ip}:{port}/start_run".format(ip=context.host_ip, port=context.port)
payload = json.dumps(context.json)
r = requests.post(url, data=payload, headers={'Content-Type': 'application/json'})
context.request_id = json.loads(r.text)["request_id"]
assert r.status_code == 200
result = wait_result(context)
- results.append(result)
assert result["status"] == STATUS_OK
+ # Extract useful metrics from result:
+ total_tx_rate = extract_value(result, "total_tx_rate")
+ overall = extract_value(result, "overall")
+ avg_delay_usec = extract_value(overall, "avg_delay_usec")
- context.result = reduce(
- lambda x, y: x if extract_value(x, "total_tx_rate") > extract_value(y,
- "total_tx_rate") else y,
- results)
+ # Log latest result:
+ context.logger.info(f"run_nfvbench_traffic: result #{i+1}: "
+ f"total_tx_rate(pps)={total_tx_rate:,} " # Add ',' thousand separator
+ f"avg_latency_usec={round(avg_delay_usec)}")
- total_tx_rate = extract_value(context.result, "total_tx_rate")
- context.rates[context.json['frame_sizes'][0] + '_' + context.json['flow_count']] = total_tx_rate
- overall = extract_value(context.result, "overall")
- avg_delay_usec = extract_value(overall, "avg_delay_usec")
- # create a synthesis with offered pps and latency values
- context.synthesis['total_tx_rate'] = total_tx_rate
- context.synthesis['avg_delay_usec'] = avg_delay_usec
+ # Keep only the result with the highest packet rate:
+ if max_total_tx_rate is None or total_tx_rate > max_total_tx_rate:
+ max_total_tx_rate = total_tx_rate
+ context.result = result
+ context.synthesis['total_tx_rate'] = total_tx_rate
+ context.synthesis['avg_delay_usec'] = avg_delay_usec
+
+ # Log max result only when we did two nfvbench runs or more:
+ if repeat > 1:
+ context.logger.info(f"run_nfvbench_traffic: max result: "
+ f"total_tx_rate(pps)={context.synthesis['total_tx_rate']:,} "
+ f"avg_latency_usec={round(context.synthesis['avg_delay_usec'])}")
@then('extract offered rate result')
if last_result:
compare_latency_values(context, last_result, threshold)
+
+@then('verify latency result is lower than {max_avg_latency_usec:g} microseconds')
+def check_latency_result_against_fixed_threshold(context, max_avg_latency_usec: float):
+ """Check latency result against a fixed threshold.
+
+ Check that the average latency measured during the current scenario run is
+ lower or equal to the provided fixed reference value.
+
+ Args:
+ context: The context data of the current scenario run. It includes the
+ test results for that run.
+
+ max_avg_latency_usec: Reference value to be used as a threshold. This
+ is a maximum average latency expressed in microseconds.
+
+ Raises:
+ AssertionError: The latency result is strictly greater than the reference value.
+
+ """
+ # Get the just measured average latency (a float):
+ new_avg_latency_usec = context.synthesis['avg_delay_usec']
+
+ # Log what we test:
+ context.logger.info("check_latency_result_against_fixed_threshold(usec): "
+ "{value}<={ref}?".format(
+ value=round(new_avg_latency_usec),
+ ref=round(max_avg_latency_usec)))
+
+ # Compare measured value to reference:
+ if new_avg_latency_usec > max_avg_latency_usec:
+ raise AssertionError("Average latency higher than max threshold: "
+ "{value} usec > {ref} usec".format(
+ value=round(new_avg_latency_usec),
+ ref=round(max_avg_latency_usec)))
+
+
@then(
'verify result is in [{min_reference_value}pps, {max_reference_value}pps] range for throughput')
def compare_throughput_pps_result_with_range_values(context, min_reference_value,
@retry(AssertionError, tries=24, delay=5.0, logger=None)
-def test_nfvbench_api(context):
+def test_nfvbench_api(nfvbench_test_url: str):
try:
- r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
+ r = requests.get(nfvbench_test_url)
assert r.status_code == 200
assert json.loads(r.text)["error_message"] == "no pending NFVbench run"
except RequestException as exc:
def get_result_from_input_values(input, result):
+ """Check test conditions in scenario results input.
+
+ Check whether the input parameters of a behave scenario results record from
+ testapi match the input parameters of the latest test. In other words,
+ check that the test results from testapi come from a test done under the
+ same conditions (frame size, flow count, rate, ...)
+
+ Args:
+ input: input dict of a results dict of a behave scenario from testapi
+
+ result: dict of nfvbench params used during the last test
+
+ Returns:
+ True if test conditions match, else False.
+
+ """
# Select required keys (other keys can be not set or unconsistent between scenarios)
required_keys = ['duration_sec', 'frame_sizes', 'flow_count', 'rate']
if 'user_label' in result:
return tagged_result
if last_results["pagination"]["current_page"] < last_results["pagination"]["total_pages"]:
page = last_results["pagination"]["current_page"] + 1
- return get_last_result(context, page)
+ return get_last_result(context, reference, page)
return None