bug fix: score calculation based on build_tag
[releng.git] / utils / test / reporting / reporting / utils / reporting_utils.py
index 6282091..58a0c62 100644 (file)
@@ -6,7 +6,6 @@
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
-from urllib2 import Request, urlopen, URLError
 import logging
 import json
 import os
@@ -14,21 +13,23 @@ import requests
 import pdfkit
 import yaml
 
+from urllib2 import Request, urlopen, URLError
+
 
 # ----------------------------------------------------------
 #
 #               YAML UTILS
 #
 # -----------------------------------------------------------
-def get_parameter_from_yaml(parameter, file):
+def get_parameter_from_yaml(parameter, config_file):
     """
     Returns the value of a given parameter in file.yaml
     parameter must be given in string format with dots
     Example: general.openstack.image_name
     """
-    with open(file) as f:
-        file_yaml = yaml.safe_load(f)
-    f.close()
+    with open(config_file) as my_file:
+        file_yaml = yaml.safe_load(my_file)
+    my_file.close()
     value = file_yaml
     for element in parameter.split("."):
         value = value.get(element)
@@ -39,6 +40,9 @@ def get_parameter_from_yaml(parameter, file):
 
 
 def get_config(parameter):
+    """
+    Get configuration parameter from yaml configuration file
+    """
     yaml_ = os.environ["CONFIG_REPORTING_YAML"]
     return get_parameter_from_yaml(parameter, yaml_)
 
@@ -49,20 +53,23 @@ def get_config(parameter):
 #
 # -----------------------------------------------------------
 def getLogger(module):
-    logFormatter = logging.Formatter("%(asctime)s [" +
-                                     module +
-                                     "] [%(levelname)-5.5s]  %(message)s")
+    """
+    Get Logger
+    """
+    log_formatter = logging.Formatter("%(asctime)s [" +
+                                      module +
+                                      "] [%(levelname)-5.5s]  %(message)s")
     logger = logging.getLogger()
     log_file = get_config('general.log.log_file')
     log_level = get_config('general.log.log_level')
 
-    fileHandler = logging.FileHandler("{0}/{1}".format('.', log_file))
-    fileHandler.setFormatter(logFormatter)
-    logger.addHandler(fileHandler)
+    file_handler = logging.FileHandler("{0}/{1}".format('.', log_file))
+    file_handler.setFormatter(log_formatter)
+    logger.addHandler(file_handler)
 
-    consoleHandler = logging.StreamHandler()
-    consoleHandler.setFormatter(logFormatter)
-    logger.addHandler(consoleHandler)
+    console_handler = logging.StreamHandler()
+    console_handler.setFormatter(log_formatter)
+    logger.addHandler(console_handler)
     logger.setLevel(log_level)
     return logger
 
@@ -73,6 +80,9 @@ def getLogger(module):
 #
 # -----------------------------------------------------------
 def getApiResults(case, installer, scenario, version):
+    """
+    Get Results by calling the API
+    """
     results = json.dumps([])
     # to remove proxy (to be removed at the end for local test only)
     # proxy_handler = urllib2.ProxyHandler({})
@@ -94,29 +104,33 @@ def getApiResults(case, installer, scenario, version):
         response = urlopen(request)
         k = response.read()
         results = json.loads(k)
-    except URLError as e:
-        print 'No kittez. Got an error code:'.format(e)
+    except URLError:
+        print "Error when retrieving results form API"
 
     return results
 
 
-def getScenarios(case, installer, version):
-
-    try:
-        case = case.getName()
-    except:
-        # if case is not an object test case, try the string
-        if type(case) == str:
-            case = case
-        else:
-            raise ValueError("Case cannot be evaluated")
-
+def getScenarios(project, case, installer, version):
+    """
+    Get the list of Scenarios
+    """
+    test_results = None
+    scenario_results = None
     period = get_config('general.period')
     url_base = get_config('testapi.url')
 
-    url = ("http://" + url_base + "?case=" + case +
-           "&period=" + str(period) + "&installer=" + installer +
-           "&version=" + version)
+    url = ("http://" + url_base +
+           "?installer=" + installer +
+           "&period=" + str(period))
+
+    if version is not None:
+        url += "&version=" + version
+
+    if project is not None:
+        url += "&project=" + project
+
+    if case is not None:
+        url += "&case=" + case
 
     try:
         request = Request(url)
@@ -136,7 +150,7 @@ def getScenarios(case, installer, version):
                     results = json.loads(k)
                     test_results += results['results']
         except KeyError:
-            print ('No pagination detected')
+            print "No pagination detected"
     except URLError as err:
         print 'Got an error code: {}'.format(err)
 
@@ -144,32 +158,37 @@ def getScenarios(case, installer, version):
         test_results.reverse()
         scenario_results = {}
 
-        for r in test_results:
+        for my_result in test_results:
             # Retrieve all the scenarios per installer
-            if not r['scenario'] in scenario_results.keys():
-                scenario_results[r['scenario']] = []
+            if not my_result['scenario'] in scenario_results.keys():
+                scenario_results[my_result['scenario']] = []
             # Do we consider results from virtual pods ...
             # Do we consider results for non HA scenarios...
             exclude_virtual_pod = get_config('functest.exclude_virtual')
             exclude_noha = get_config('functest.exclude_noha')
-            if ((exclude_virtual_pod and "virtual" in r['pod_name']) or
-                    (exclude_noha and "noha" in r['scenario'])):
+            if ((exclude_virtual_pod and "virtual" in my_result['pod_name']) or
+                    (exclude_noha and "noha" in my_result['scenario'])):
                 print "exclude virtual pod results..."
             else:
-                scenario_results[r['scenario']].append(r)
+                scenario_results[my_result['scenario']].append(my_result)
 
     return scenario_results
 
 
 def getScenarioStats(scenario_results):
+    """
+    Get the number of occurence of scenarios over the defined PERIOD
+    """
     scenario_stats = {}
-    for k, v in scenario_results.iteritems():
-        scenario_stats[k] = len(v)
-
+    for res_k, res_v in scenario_results.iteritems():
+        scenario_stats[res_k] = len(res_v)
     return scenario_stats
 
 
 def getScenarioStatus(installer, version):
+    """
+    Get the status of a scenariofor Yardstick
+    """
     period = get_config('general.period')
     url_base = get_config('testapi.url')
 
@@ -184,33 +203,46 @@ def getScenarioStatus(installer, version):
         response.close()
         results = json.loads(k)
         test_results = results['results']
-    except URLError as e:
-        print 'Got an error code: {}'.format(e)
+    except URLError:
+        print "GetScenarioStatus: error when calling the API"
 
-    scenario_results = {}
-    result_dict = {}
+    x86 = 'x86'
+    aarch64 = 'aarch64'
+    scenario_results = {x86: {}, aarch64: {}}
+    result_dict = {x86: {}, aarch64: {}}
     if test_results is not None:
-        for r in test_results:
-            if r['stop_date'] != 'None' and r['criteria'] is not None:
-                if not r['scenario'] in scenario_results.keys():
-                    scenario_results[r['scenario']] = []
-                scenario_results[r['scenario']].append(r)
-
-        for k, v in scenario_results.items():
-            # scenario_results[k] = v[:LASTEST_TESTS]
-            s_list = []
-            for element in v:
-                if element['criteria'] == 'SUCCESS':
-                    s_list.append(1)
+        for test_r in test_results:
+            if (test_r['stop_date'] != 'None' and
+                    test_r['criteria'] is not None):
+                scenario_name = test_r['scenario']
+                if 'arm' in test_r['pod_name']:
+                    if not test_r['scenario'] in scenario_results[aarch64]:
+                        scenario_results[aarch64][scenario_name] = []
+                    scenario_results[aarch64][scenario_name].append(test_r)
                 else:
-                    s_list.append(0)
-            result_dict[k] = s_list
+                    if not test_r['scenario'] in scenario_results[x86]:
+                        scenario_results[x86][scenario_name] = []
+                    scenario_results[x86][scenario_name].append(test_r)
+
+        for key in scenario_results:
+            for scen_k, scen_v in scenario_results[key].items():
+                # scenario_results[k] = v[:LASTEST_TESTS]
+                s_list = []
+                for element in scen_v:
+                    if element['criteria'] == 'PASS':
+                        s_list.append(1)
+                    else:
+                        s_list.append(0)
+                result_dict[key][scen_k] = s_list
 
     # return scenario_results
     return result_dict
 
 
 def getQtipResults(version, installer):
+    """
+    Get QTIP results
+    """
     period = get_config('qtip.period')
     url_base = get_config('testapi.url')
 
@@ -240,19 +272,24 @@ def getQtipResults(version, installer):
 
 
 def getNbtestOk(results):
+    """
+    based on default value (PASS) count the number of test OK
+    """
     nb_test_ok = 0
-    for r in results:
-        for k, v in r.iteritems():
+    for my_result in results:
+        for res_k, res_v in my_result.iteritems():
             try:
-                if "PASS" in v:
+                if "PASS" in res_v:
                     nb_test_ok += 1
-            except:
+            except Exception:
                 print "Cannot retrieve test status"
     return nb_test_ok
 
 
-def getResult(testCase, installer, scenario, version):
-
+def getCaseScore(testCase, installer, scenario, version):
+    """
+    Get Result  for a given Functest Testcase
+    """
     # retrieve raw results
     results = getApiResults(testCase, installer, scenario, version)
     # let's concentrate on test results only
@@ -269,10 +306,10 @@ def getResult(testCase, installer, scenario, version):
         # print " ---------------- "
         # print "nb of results:" + str(len(test_results))
 
-        for r in test_results:
+        for res_r in test_results:
             # print r["start_date"]
             # print r["criteria"]
-            scenario_results.append({r["start_date"]: r["criteria"]})
+            scenario_results.append({res_r["start_date"]: res_r["criteria"]})
         # sort results
         scenario_results.sort()
         # 4 levels for the results
@@ -295,7 +332,7 @@ def getResult(testCase, installer, scenario, version):
             test_result_indicator = 1
         else:
             # Test the last 4 run
-            if (len(scenario_results) > 3):
+            if len(scenario_results) > 3:
                 last4runResults = scenario_results[-4:]
                 nbTestOkLast4 = getNbtestOk(last4runResults)
                 # print "Nb test OK (last 4 run):"+ str(nbTestOkLast4)
@@ -308,20 +345,58 @@ def getResult(testCase, installer, scenario, version):
     return test_result_indicator
 
 
+def getCaseScoreFromBuildTag(testCase, s_results):
+    """
+    Get Results for a given Functest Testcase with arch filtering
+    """
+    url_base = get_config('testapi.url')
+    nb_tests = get_config('general.nb_iteration_tests_success_criteria')
+    test_result_indicator = 0
+    # architecture is not a result field...so we cannot use getResult as it is
+    res_matrix = []
+    try:
+        for s_result in s_results:
+            build_tag = s_result['build_tag']
+            d = s_result['start_date']
+            res_matrix.append({'date': d,
+                               'build_tag': build_tag})
+        # sort res_matrix
+        filter_res_matrix = sorted(res_matrix, key=lambda k: k['date'],
+                                   reverse=True)[:nb_tests]
+        for my_res in filter_res_matrix:
+            url = ("http://" + url_base + "?case=" + testCase +
+                   "&build_tag=" + my_res['build_tag'])
+            request = Request(url)
+            response = urlopen(request)
+            k = response.read()
+            results = json.loads(k)
+            if "PASS" in results['results'][0]['criteria']:
+                test_result_indicator += 1
+    except:
+        print "No results found for this case"
+    if test_result_indicator > 2:
+        test_result_indicator = test_result_indicator - 1
+
+    return test_result_indicator
+
+
 def getJenkinsUrl(build_tag):
-    # e.g. jenkins-functest-apex-apex-daily-colorado-daily-colorado-246
-    # id = 246
-    # jenkins-functest-compass-huawei-pod5-daily-master-136
-    # id = 136
-    # note it is linked to jenkins format
-    # if this format changes...function to be adapted....
+    """
+    Get Jenkins url_base corespoding to the last test CI run
+    e.g. jenkins-functest-apex-apex-daily-colorado-daily-colorado-246
+    id = 246
+    jenkins-functest-compass-huawei-pod5-daily-master-136
+    id = 136
+    note it is linked to jenkins format
+    if this format changes...function to be adapted....
+    """
     url_base = get_config('functest.jenkins_url')
     try:
         build_id = [int(s) for s in build_tag.split("-") if s.isdigit()]
         url_id = (build_tag[8:-(len(str(build_id[0])) + 1)] +
                   "/" + str(build_id[0]))
         jenkins_url = url_base + url_id + "/console"
-    except:
+    except Exception:
         print 'Impossible to get jenkins url:'
 
     if "jenkins-" not in build_tag:
@@ -331,10 +406,13 @@ def getJenkinsUrl(build_tag):
 
 
 def getScenarioPercent(scenario_score, scenario_criteria):
+    """
+    Get success rate of the scenario (in %)
+    """
     score = 0.0
     try:
         score = float(scenario_score) / float(scenario_criteria) * 100
-    except:
+    except Exception:
         print 'Impossible to calculate the percentage score'
     return score
 
@@ -343,32 +421,41 @@ def getScenarioPercent(scenario_score, scenario_criteria):
 # Functest
 # *********
 def getFunctestConfig(version=""):
+    """
+    Get Functest configuration
+    """
     config_file = get_config('functest.test_conf') + version
     response = requests.get(config_file)
     return yaml.safe_load(response.text)
 
 
 def getArchitectures(scenario_results):
+    """
+    Get software architecture (x86 or Aarch64)
+    """
     supported_arch = ['x86']
-    if (len(scenario_results) > 0):
+    if len(scenario_results) > 0:
         for scenario_result in scenario_results.values():
             for value in scenario_result:
-                if ("armband" in value['build_tag']):
+                if "armband" in value['build_tag']:
                     supported_arch.append('aarch64')
                     return supported_arch
     return supported_arch
 
 
 def filterArchitecture(results, architecture):
+    """
+    Restrict the list of results based on given architecture
+    """
     filtered_results = {}
-    for name, results in results.items():
+    for name, res in results.items():
         filtered_values = []
-        for value in results:
-            if (architecture is "x86"):
+        for value in res:
+            if architecture is "x86":
                 # drop aarch64 results
                 if ("armband" not in value['build_tag']):
                     filtered_values.append(value)
-            elif(architecture is "aarch64"):
+            elif architecture is "aarch64":
                 # drop x86 results
                 if ("armband" in value['build_tag']):
                     filtered_values.append(value)
@@ -381,6 +468,9 @@ def filterArchitecture(results, architecture):
 # Yardstick
 # *********
 def subfind(given_list, pattern_list):
+    """
+    Yardstick util function
+    """
     LASTEST_TESTS = get_config('general.nb_iteration_tests_success_criteria')
     for i in range(len(given_list)):
         if given_list[i] == pattern_list[0] and \
@@ -390,7 +480,9 @@ def subfind(given_list, pattern_list):
 
 
 def _get_percent(status):
-
+    """
+    Yardstick util function to calculate success rate
+    """
     if status * 100 % 6:
         return round(float(status) * 100 / 6, 1)
     else:
@@ -398,13 +490,16 @@ def _get_percent(status):
 
 
 def get_percent(four_list, ten_list):
+    """
+    Yardstick util function to calculate success rate
+    """
     four_score = 0
     ten_score = 0
 
-    for v in four_list:
-        four_score += v
-    for v in ten_list:
-        ten_score += v
+    for res_v in four_list:
+        four_score += res_v
+    for res_v in ten_list:
+        ten_score += res_v
 
     LASTEST_TESTS = get_config('general.nb_iteration_tests_success_criteria')
     if four_score == LASTEST_TESTS:
@@ -420,9 +515,12 @@ def get_percent(four_list, ten_list):
 
 
 def _test():
+    """
+    Yardstick util function (test)
+    """
     status = getScenarioStatus("compass", "master")
     print "status:++++++++++++++++++++++++"
-    print(json.dumps(status, indent=4))
+    print json.dumps(status, indent=4)
 
 
 # ----------------------------------------------------------
@@ -432,8 +530,9 @@ def _test():
 # -----------------------------------------------------------
 
 def export_csv(scenario_file_name, installer, version):
-    # csv
-    # generate sub files based on scenario_history.txt
+    """
+    Generate sub files based on scenario_history.txt
+    """
     scenario_installer_file_name = ("./display/" + version +
                                     "/functest/scenario_history_" +
                                     installer + ".csv")
@@ -443,21 +542,25 @@ def export_csv(scenario_file_name, installer, version):
         for line in scenario_file:
             if installer in line:
                 scenario_installer_file.write(line)
-        scenario_installer_file.close
+    scenario_installer_file.close
 
 
 def generate_csv(scenario_file):
+    """
+    Generate sub files based on scenario_history.txt
+    """
     import shutil
-    # csv
-    # generate sub files based on scenario_history.txt
     csv_file = scenario_file.replace('txt', 'csv')
     shutil.copy2(scenario_file, csv_file)
 
 
 def export_pdf(pdf_path, pdf_doc_name):
+    """
+    Export results to pdf
+    """
     try:
         pdfkit.from_file(pdf_path, pdf_doc_name)
     except IOError:
         print "Error but pdf generated anyway..."
-    except:
+    except Exception:
         print "impossible to generate PDF"