behave_tests: refactor TestAPI DB lookup
[nfvbench.git] / behave_tests / features / steps / steps.py
index 965b0c8..c347871 100644 (file)
 #    under the License.
 #
 
-from functools import reduce
-
 from behave import given
 from behave import when
 from behave import then
+from copy import deepcopy
 from requests import RequestException
 from retry import retry
 import json
 import requests
 import subprocess
 from subprocess import DEVNULL
+from typing import Optional
 
 from nfvbench.summarizer import Formatter
 from nfvbench.traffic_gen.traffic_utils import parse_rate_str
 
+from behave_tests.features.steps.testapi import TestapiClient, nfvbench_input_to_str
+
+
 STATUS_ERROR = "ERROR"
 
 STATUS_OK = "OK"
@@ -42,11 +45,6 @@ def override_xtesting_project_name(context, project_name):
     context.data['PROJECT_NAME'] = project_name
 
 
-@given('TEST_DB_EXT_URL: {test_db_ext_url}')
-def override_xtesting_test_db_ext_url(context, test_db_ext_url):
-    context.data['TEST_DB_EXT_URL'] = test_db_ext_url
-
-
 @given('TEST_DB_URL: {test_db_url}')
 def override_xtesting_test_db_url(context, test_db_url):
     context.data['TEST_DB_URL'] = test_db_url
@@ -133,6 +131,77 @@ def add_percentage_rate(context, percentage_rate):
     context.percentage_rate = percentage_rate
     rate = percentage_previous_rate(context, percentage_rate)
     context.json['rate'] = rate
+    context.logger.info(f"add_percentage_rate: {percentage_rate} => rate={rate}")
+
+
+@given('packet rate equal to {percentage} of max throughput of last characterization')
+def add_packet_rate(context, percentage: str):
+    """Update nfvbench run config with packet rate based on reference value.
+
+    For the already configured frame size and flow count, retrieve the max
+    throughput obtained during the latest successful characterization run.  Then
+    retain `percentage` of this value for the packet rate and update `context`.
+
+    Args:
+        context: The context data of the current scenario run.  It includes the
+            testapi endpoints to retrieve the reference values.
+
+        percentage: String representation of the percentage of the reference max
+            throughput.  Example: "70%"
+
+    Updates context:
+        context.percentage_rate: percentage of reference max throughput
+            using a string representation. Example: "70%"
+
+        context.json['rate']: packet rate in packets per second using a string
+            representation.  Example: "2000pps"
+
+    Raises:
+        ValueError: invalid percentage string
+
+        AssertionError: cannot find reference throughput value
+
+    """
+    # Validate percentage
+    if not percentage.endswith('%'):
+        raise ValueError('Invalid percentage string: "{0}"'.format(percentage))
+    percentage_float = convert_percentage_str_to_float(percentage)
+
+    # Retrieve nfvbench results report from testapi for:
+    # - the latest throughput scenario inside a characterization feature that passed
+    # - the test duration, frame size and flow count given in context.json
+    # - (optionally) the user_label and flavor_type given in context.json
+    # - the 'ndr' rate
+    testapi_params = {"project_name": context.data['PROJECT_NAME'],
+                      "case_name": "characterization"}
+    nfvbench_test_conditions = deepcopy(context.json)
+    nfvbench_test_conditions['rate'] = 'ndr'
+    testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
+    last_result = testapi_client.find_last_result(testapi_params,
+                                                  scenario_tag="throughput",
+                                                  nfvbench_test_input=nfvbench_test_conditions)
+    if last_result is None:
+        error_msg = "No characterization result found for scenario_tag=throughput"
+        error_msg += " and nfvbench test conditions "
+        error_msg += nfvbench_input_to_str(nfvbench_test_conditions)
+        context.logger.error(error_msg)
+        raise AssertionError(error_msg)
+
+    # From the results report, extract the max throughput in packets per second
+    total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
+    context.logger.info("add_packet_rate: max throughput of last characterization (pps): "
+                        f"{total_tx_rate:,}")
+
+    # Compute the desired packet rate
+    rate = round(total_tx_rate * percentage_float)
+    context.logger.info(f"add_packet_rate: percentage={percentage} rate(pps)={rate:,}")
+
+    # Build rate string using a representation understood by nfvbench
+    rate_str = str(rate) + "pps"
+
+    # Update context
+    context.percentage_rate = percentage
+    context.json['rate'] = rate_str
 
 
 """When steps."""
@@ -141,25 +210,52 @@ def add_percentage_rate(context, percentage_rate):
 @when('NFVbench API is ready')
 @when('NFVbench API is ready on host {host_ip}')
 @when('NFVbench API is ready on host {host_ip} and port {port:d}')
-def start_server(context, host_ip="127.0.0.1", port=7555):
-    context.host_ip = host_ip
-    context.port = port
+def start_server(context, host_ip: Optional[str]=None, port: Optional[int]=None):
+    """Start nfvbench server if needed and wait until it is ready.
+
+    Quickly check whether nfvbench HTTP server is ready by reading the "/status"
+    page.  If not, start the server locally.  Then wait until nfvbench API is
+    ready by polling the "/status" page.
+
+    This code is useful when behave and nfvbench run on the same machine.  In
+    particular, it is needed to run behave tests with nfvbench Docker container.
+
+    There is currently no way to prevent behave from starting automatically
+    nfvbench server when this is not desirable, for instance when behave is
+    started using ansible-role-nfvbench.  The user or the orchestration layer
+    should make sure nfvbench API is ready before starting behave tests.
+
+    """
+    # NFVbench server host IP and port number have been setup from environment variables (see
+    # environment.py:before_all()).   Here we allow to override them from feature files:
+    if host_ip is not None:
+        context.host_ip = host_ip
+    if port is not None:
+        context.port = port
+
+    nfvbench_test_url = "http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port)
+    context.logger.info("start_server: test nfvbench API on URL: " + nfvbench_test_url)
+
     try:
         # check if API is already available
-        requests.get(
-            "http://{host_ip}:{port}/status".format(host_ip=context.host_ip, port=context.port))
+        requests.get(nfvbench_test_url)
     except RequestException:
+        context.logger.info("nfvbench server not running")
+
         cmd = ["nfvbench", "-c", context.data['config'], "--server"]
-        if host_ip != "127.0.0.1":
+        if context.host_ip != "127.0.0.1":
             cmd.append("--host")
-            cmd.append(host_ip)
-        if port != 7555:
+            cmd.append(context.host_ip)
+        if context.port != 7555:
             cmd.append("--port")
-            cmd.append(port)
+            cmd.append(str(context.port))
+
+        context.logger.info("Start nfvbench server with command: " + " ".join(cmd))
 
         subprocess.Popen(cmd, stdout=DEVNULL, stderr=subprocess.STDOUT)
 
-    test_nfvbench_api(context)
+    # Wait until nfvbench API is ready
+    test_nfvbench_api(nfvbench_test_url)
 
 
 """Then steps."""
@@ -167,40 +263,65 @@ def start_server(context, host_ip="127.0.0.1", port=7555):
 
 @then('run is started and waiting for result')
 @then('{repeat:d} runs are started and waiting for maximum result')
-def step_impl(context, repeat=1):
-    results = []
+def run_nfvbench_traffic(context, repeat=1):
+    context.logger.info(f"run_nfvbench_traffic: fs={context.json['frame_sizes'][0]} "
+                        f"fc={context.json['flow_count']} "
+                        f"rate={context.json['rate']} repeat={repeat}")
+
     if 'json' not in context.json:
+        # Build filename for nfvbench results in JSON format
         context.json['json'] = '/var/lib/xtesting/results/' + context.CASE_NAME + \
-                               '/nfvbench-' + context.tag + '-fs_' + \
-                               context.json['frame_sizes'][0] + '-fc_' + \
-                               context.json['flow_count'] + '-rate_' + \
-                               context.json['rate'] + '.json'
+                               '/nfvbench-' + context.tag + \
+                               '-fs_' + context.json['frame_sizes'][0] + \
+                               '-fc_' + context.json['flow_count']
+        if context.percentage_rate is not None:
+            # Add rate as a percentage, eg '-rate_70%'
+            context.json['json'] += '-rate_' + context.percentage_rate
+        else:
+            # Add rate in bits or packets per second, eg '-rate_15Gbps' or '-rate_10kpps'
+            context.json['json'] += '-rate_' + context.json['rate']
+        context.json['json'] += '.json'
+
     json_base_name = context.json['json']
+
+    max_total_tx_rate = None
+    # rem: don't init with 0 in case nfvbench gets crazy and returns a negative packet rate
+
     for i in range(repeat):
         if repeat > 1:
             context.json['json'] = json_base_name.strip('.json') + '-' + str(i) + '.json'
 
+        # Start nfvbench traffic and wait result:
         url = "http://{ip}:{port}/start_run".format(ip=context.host_ip, port=context.port)
         payload = json.dumps(context.json)
         r = requests.post(url, data=payload, headers={'Content-Type': 'application/json'})
         context.request_id = json.loads(r.text)["request_id"]
         assert r.status_code == 200
         result = wait_result(context)
-        results.append(result)
         assert result["status"] == STATUS_OK
 
+        # Extract useful metrics from result:
+        total_tx_rate = extract_value(result, "total_tx_rate")
+        overall = extract_value(result, "overall")
+        avg_delay_usec = extract_value(overall, "avg_delay_usec")
 
-    context.result = reduce(
-        lambda x, y: x if extract_value(x, "total_tx_rate") > extract_value(y,
-                                                                            "total_tx_rate") else y,
-        results)
+        # Log latest result:
+        context.logger.info(f"run_nfvbench_traffic: result #{i+1}: "
+                            f"total_tx_rate(pps)={total_tx_rate:,} "  # Add ',' thousand separator
+                            f"avg_latency_usec={round(avg_delay_usec)}")
 
-    total_tx_rate = extract_value(context.result, "total_tx_rate")
-    overall = extract_value(context.result, "overall")
-    avg_delay_usec = extract_value(overall, "avg_delay_usec")
-    # create a synthesis with offered pps and latency values
-    context.synthesis['total_tx_rate'] = total_tx_rate
-    context.synthesis['avg_delay_usec'] = avg_delay_usec
+        # Keep only the result with the highest packet rate:
+        if max_total_tx_rate is None or total_tx_rate > max_total_tx_rate:
+            max_total_tx_rate = total_tx_rate
+            context.result = result
+            context.synthesis['total_tx_rate'] = total_tx_rate
+            context.synthesis['avg_delay_usec'] = avg_delay_usec
+
+    # Log max result only when we did two nfvbench runs or more:
+    if repeat > 1:
+        context.logger.info(f"run_nfvbench_traffic: max result: "
+                            f"total_tx_rate(pps)={context.synthesis['total_tx_rate']:,} "
+                            f"avg_latency_usec={round(context.synthesis['avg_delay_usec'])}")
 
 
 @then('extract offered rate result')
@@ -248,12 +369,18 @@ def check_latency_result_against_fixed_threshold(context, max_avg_latency_usec:
     # Get the just measured average latency (a float):
     new_avg_latency_usec = context.synthesis['avg_delay_usec']
 
+    # Log what we test:
+    context.logger.info("check_latency_result_against_fixed_threshold(usec): "
+                        "{value}<={ref}?".format(
+                            value=round(new_avg_latency_usec),
+                            ref=round(max_avg_latency_usec)))
+
     # Compare measured value to reference:
     if new_avg_latency_usec > max_avg_latency_usec:
         raise AssertionError("Average latency higher than max threshold: "
-                             "{avg_latency} usec > {threshold} usec".format(
-                                 avg_latency=round(new_avg_latency_usec),
-                                 threshold=round(max_avg_latency_usec)))
+                             "{value} usec > {ref} usec".format(
+                                 value=round(new_avg_latency_usec),
+                                 ref=round(max_avg_latency_usec)))
 
 
 @then(
@@ -314,9 +441,9 @@ def push_result_database(context):
 
 
 @retry(AssertionError, tries=24, delay=5.0, logger=None)
-def test_nfvbench_api(context):
+def test_nfvbench_api(nfvbench_test_url: str):
     try:
-        r = requests.get("http://{ip}:{port}/status".format(ip=context.host_ip, port=context.port))
+        r = requests.get(nfvbench_test_url)
         assert r.status_code == 200
         assert json.loads(r.text)["error_message"] == "no pending NFVbench run"
     except RequestException as exc:
@@ -424,18 +551,6 @@ def latency_comparison(context, old_latency=None, threshold=None, reference_valu
                     max_reference_value=Formatter.standard(reference_values[1])))
 
 
-def get_result_from_input_values(input, result):
-    # Select required keys (other keys can be not set or unconsistent between scenarios)
-    required_keys = ['duration_sec', 'frame_sizes', 'flow_count', 'rate']
-    if 'user_label' in result:
-        required_keys.append('user_label')
-    if 'flavor_type' in result:
-        required_keys.append('flavor_type')
-    subset_input = dict((k, input[k]) for k in required_keys if k in input)
-    subset_result = dict((k, result[k]) for k in required_keys if k in result)
-    return subset_input == subset_result
-
-
 def extract_value(obj, key):
     """Pull all values of specified key from nested JSON."""
     arr = []
@@ -457,28 +572,58 @@ def extract_value(obj, key):
     return results[0]
 
 
-def get_last_result(context, reference=None, page=None):
+def get_last_result(context, reference: bool = False):
+    """Look for a previous result in TestAPI database.
+
+    Search TestAPI results from newest to oldest and return the first result
+    record matching the context constraints.  Log an overview of the results
+    found (max rate pps, avg delay usec, test conditions, date of measurement).
+
+    The result record test case must match the current test case
+    ('characterization' or 'non-regression') unless `reference` is set to True.
+
+    The result record scenario tag must match the current scenario tag
+    ('throughput' or 'latency').
+
+    Args:
+        context: behave context including project name, test case name, traffic
+            configuration (frame size, flow count, test duration), type of the
+            compute node under test (via loop VM flavor_type) and platform (via
+            user_label).
+
+        reference: when True, look for results with the 'characterization' test
+            case name instead of the current test case name.
+
+    Returns:
+        a JSON dictionary with the results, ie a dict with the keys "input",
+            "output" and "synthesis" when the scenario tag is 'throughput' or
+            'latency'
+    """
     if reference:
         case_name = 'characterization'
     else:
         case_name = context.CASE_NAME
-    url = context.data['TEST_DB_URL'] + '?project={project_name}&case={case_name}'.format(
-        project_name=context.data['PROJECT_NAME'], case_name=case_name)
-    if context.data['INSTALLER_TYPE']:
-        url += '&installer={installer_name}'.format(installer_name=context.data['INSTALLER_TYPE'])
-    if context.data['NODE_NAME']:
-        url += '&pod={pod_name}'.format(pod_name=context.data['NODE_NAME'])
-    url += '&criteria=PASS'
-    if page:
-        url += '&page={page}'.format(page=page)
-    last_results = requests.get(url)
-    assert last_results.status_code == 200
-    last_results = json.loads(last_results.text)
-    for result in last_results["results"]:
-        for tagged_result in result["details"]["results"][context.tag]:
-            if get_result_from_input_values(tagged_result["input"], context.json):
-                return tagged_result
-    if last_results["pagination"]["current_page"] < last_results["pagination"]["total_pages"]:
-        page = last_results["pagination"]["current_page"] + 1
-        return get_last_result(context, page)
-    return None
+    testapi_params = {"project_name": context.data['PROJECT_NAME'],
+                      "case_name": case_name}
+    testapi_client = TestapiClient(testapi_url=context.data['TEST_DB_URL'])
+    last_result = testapi_client.find_last_result(testapi_params,
+                                                  scenario_tag=context.tag,
+                                                  nfvbench_test_input=context.json)
+    if last_result is None:
+        error_msg = "get_last_result: No result found in TestAPI database:"
+        error_msg += f" case_name={case_name} scenario_tag={context.tag} "
+        error_msg += nfvbench_input_to_str(context.json)
+        context.logger.error(error_msg)
+        raise AssertionError(error_msg)
+
+    # Log an overview of the last result (latency and max throughput)
+    measurement_date = last_result["output"]["result"]["date"]
+    total_tx_rate = extract_value(last_result["output"], "total_tx_rate")
+    avg_delay_usec = extract_value(extract_value(last_result["output"], "overall"),
+                                   "avg_delay_usec")
+    context.logger.info(f"get_last_result: case_name={case_name} scenario_tag={context.tag}"
+                        f' measurement_date="{measurement_date}"'
+                        f" total_tx_rate(pps)={total_tx_rate:,}"
+                        f" avg_latency_usec={round(avg_delay_usec)}")
+
+    return last_result