merge N days and whole database process and not publish existed data any way
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
index 8c36d30..b722793 100644 (file)
 #! /usr/bin/env python
-import logging
-import argparse
-import shared_utils
+import datetime
 import json
-import urlparse
-import uuid
 import os
 import subprocess
-import datetime
-
-logger = logging.getLogger('mongo_to_elasticsearch')
-logger.setLevel(logging.DEBUG)
-file_handler = logging.FileHandler('/var/log/{}.log'.format(__name__))
-file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
-logger.addHandler(file_handler)
-
-
-def _get_dicts_from_list(testcase, dict_list, keys):
-    dicts = []
-    for dictionary in dict_list:
-        # iterate over dictionaries in input list
-        if not isinstance(dictionary, dict):
-            logger.info("Skipping non-dict details testcase [{}]".format(testcase))
-            continue
-        if keys == set(dictionary.keys()):
-            # check the dictionary structure
-            dicts.append(dictionary)
-    return dicts
-
-
-def _get_results_from_list_of_dicts(list_of_dict_statuses, dict_indexes, expected_results=None):
-    test_results = {}
-    for test_status in list_of_dict_statuses:
-        status = test_status
-        for index in dict_indexes:
-            status = status[index]
-        if status in test_results:
-            test_results[status] += 1
-        else:
-            test_results[status] = 1
-
-    if expected_results is not None:
-        for expected_result in expected_results:
-            if expected_result not in test_results:
-                test_results[expected_result] = 0
-
-    return test_results
-
-
-def _convert_value(value):
-    return value if value != '' else 0
-
-
-def _convert_duration(duration):
-    if (isinstance(duration, str) or isinstance(duration, unicode)) and ':' in duration:
-        hours, minutes, seconds = duration.split(":")
-        hours = _convert_value(hours)
-        minutes = _convert_value(minutes)
-        seconds = _convert_value(seconds)
-        int_duration = 3600 * int(hours) + 60 * int(minutes) + float(seconds)
-    else:
-        int_duration = duration
-    return int_duration
-
-
-def modify_functest_tempest(testcase):
-    if modify_default_entry(testcase):
-        testcase_details = testcase['details']
-        testcase_tests = float(testcase_details['tests'])
-        testcase_failures = float(testcase_details['failures'])
-        if testcase_tests != 0:
-            testcase_details['success_percentage'] = 100 * (testcase_tests - testcase_failures) / testcase_tests
-        else:
-            testcase_details['success_percentage'] = 0
-        return True
-    else:
-        return False
-
-
-def modify_functest_vims(testcase):
-    """
-    Structure:
-        details.sig_test.result.[{result}]
-        details.sig_test.duration
-        details.vIMS.duration
-        details.orchestrator.duration
-
-    Find data for these fields
-        -> details.sig_test.duration
-        -> details.sig_test.tests
-        -> details.sig_test.failures
-        -> details.sig_test.passed
-        -> details.sig_test.skipped
-        -> details.vIMS.duration
-        -> details.orchestrator.duration
-    """
-    testcase_details = testcase['details']
-    sig_test_results = _get_dicts_from_list(testcase, testcase_details['sig_test']['result'],
-                                            {'duration', 'result', 'name', 'error'})
-    if len(sig_test_results) < 1:
-        logger.info("No 'result' from 'sig_test' found in vIMS details, skipping")
-        return False
-    else:
-        test_results = _get_results_from_list_of_dicts(sig_test_results, ('result',), ('Passed', 'Skipped', 'Failed'))
-        passed = test_results['Passed']
-        skipped = test_results['Skipped']
-        failures = test_results['Failed']
-        all_tests = passed + skipped + failures
-        testcase['details'] = {
-            'sig_test': {
-                'duration': testcase_details['sig_test']['duration'],
-                'tests': all_tests,
-                'failures': failures,
-                'passed': passed,
-                'skipped': skipped
-            },
-            'vIMS': {
-                'duration': testcase_details['vIMS']['duration']
-            },
-            'orchestrator': {
-                'duration': testcase_details['orchestrator']['duration']
-            }
-        }
-        return True
-
-
-def modify_functest_onos(testcase):
-    """
-    Structure:
-        details.FUNCvirNet.duration
-        details.FUNCvirNet.status.[{Case result}]
-        details.FUNCvirNetL3.duration
-        details.FUNCvirNetL3.status.[{Case result}]
-
-    Find data for these fields
-        -> details.FUNCvirNet.duration
-        -> details.FUNCvirNet.tests
-        -> details.FUNCvirNet.failures
-        -> details.FUNCvirNetL3.duration
-        -> details.FUNCvirNetL3.tests
-        -> details.FUNCvirNetL3.failures
-    """
-    testcase_details = testcase['details']
-
-    funcvirnet_details = testcase_details['FUNCvirNet']['status']
-    funcvirnet_statuses = _get_dicts_from_list(testcase, funcvirnet_details, {'Case result', 'Case name:'})
-
-    funcvirnetl3_details = testcase_details['FUNCvirNetL3']['status']
-    funcvirnetl3_statuses = _get_dicts_from_list(testcase, funcvirnetl3_details, {'Case result', 'Case name:'})
-
-    if len(funcvirnet_statuses) < 0:
-        logger.info("No results found in 'FUNCvirNet' part of ONOS results")
-        return False
-    elif len(funcvirnetl3_statuses) < 0:
-        logger.info("No results found in 'FUNCvirNetL3' part of ONOS results")
-        return False
-    else:
-        funcvirnet_results = _get_results_from_list_of_dicts(funcvirnet_statuses,
-                                                             ('Case result',), ('PASS', 'FAIL'))
-        funcvirnetl3_results = _get_results_from_list_of_dicts(funcvirnetl3_statuses,
-                                                               ('Case result',), ('PASS', 'FAIL'))
-
-        funcvirnet_passed = funcvirnet_results['PASS']
-        funcvirnet_failed = funcvirnet_results['FAIL']
-        funcvirnet_all = funcvirnet_passed + funcvirnet_failed
-
-        funcvirnetl3_passed = funcvirnetl3_results['PASS']
-        funcvirnetl3_failed = funcvirnetl3_results['FAIL']
-        funcvirnetl3_all = funcvirnetl3_passed + funcvirnetl3_failed
-
-        testcase_details['FUNCvirNet'] = {
-            'duration': _convert_duration(testcase_details['FUNCvirNet']['duration']),
-            'tests': funcvirnet_all,
-            'failures': funcvirnet_failed
-        }
-
-        testcase_details['FUNCvirNetL3'] = {
-            'duration': _convert_duration(testcase_details['FUNCvirNetL3']['duration']),
-            'tests': funcvirnetl3_all,
-            'failures': funcvirnetl3_failed
-        }
-
-        return True
-
-
-def modify_functest_rally(testcase):
-    """
-    Structure:
-        details.[{summary.duration}]
-        details.[{summary.nb success}]
-        details.[{summary.nb tests}]
-
-    Find data for these fields
-        -> details.duration
-        -> details.tests
-        -> details.success_percentage
-    """
-    summaries = _get_dicts_from_list(testcase, testcase['details'], {'summary'})
-
-    if len(summaries) != 1:
-        logger.info("Found zero or more than one 'summaries' in Rally details, skipping")
-        return False
-    else:
-        summary = summaries[0]['summary']
-        testcase['details'] = {
-            'duration': summary['duration'],
-            'tests': summary['nb tests'],
-            'success_percentage': summary['nb success']
-        }
-        return True
+import traceback
+import urlparse
+import uuid
 
+import argparse
 
-def modify_functest_odl(testcase):
-    """
-    Structure:
-        details.details.[{test_status.@status}]
+import conf_utils
+import logger_utils
+import mongo2elastic_format
+import shared_utils
 
-    Find data for these fields
-        -> details.tests
-        -> details.failures
-        -> details.success_percentage?
-    """
-    test_statuses = _get_dicts_from_list(testcase, testcase['details']['details'],
-                                         {'test_status', 'test_doc', 'test_name'})
-    if len(test_statuses) < 1:
-        logger.info("No 'test_status' found in ODL details, skipping")
-        return False
-    else:
-        test_results = _get_results_from_list_of_dicts(test_statuses, ('test_status', '@status'), ('PASS', 'FAIL'))
+logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
 
-        passed_tests = test_results['PASS']
-        failed_tests = test_results['FAIL']
-        all_tests = passed_tests + failed_tests
+parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
+parser.add_argument('-od', '--output-destination',
+                    default='elasticsearch',
+                    choices=('elasticsearch', 'stdout'),
+                    help='defaults to elasticsearch')
 
-        testcase['details'] = {
-            'tests': all_tests,
-            'failures': failed_tests,
-            'success_percentage': 100 * passed_tests / float(all_tests)
-        }
-        return True
+parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
+                    help='get entries old at most N days from mongodb and'
+                         ' parse those that are not already in elasticsearch.'
+                         ' If not present, will get everything from mongodb, which is the default')
 
+parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
+                    help='the url of elasticsearch, defaults to http://localhost:9200')
 
-def modify_default_entry(testcase):
-    """
-    Look for these and leave any of those:
-        details.duration
-        details.tests
-        details.failures
+parser.add_argument('-u', '--elasticsearch-username', default=None,
+                    help='The username with password for elasticsearch in format username:password')
 
-    If none are present, then return False
-    """
-    found = False
-    testcase_details = testcase['details']
-    fields = ['duration', 'tests', 'failures']
-    if isinstance(testcase_details, dict):
-        for key, value in testcase_details.items():
-            if key in fields:
-                found = True
-                if key == 'duration':
-                    testcase_details[key] = _convert_duration(value)
-            else:
-                del testcase_details[key]
+args = parser.parse_args()
 
-    return found
+tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
 
 
 def _fix_date(date_string):
@@ -272,7 +45,7 @@ def _fix_date(date_string):
         return date_string[:-3].replace(' ', 'T') + 'Z'
 
 
-def verify_mongo_entry(testcase):
+def verify_document(testcase):
     """
     Mandatory fields:
         installer
@@ -296,7 +69,8 @@ def verify_mongo_entry(testcase):
                         'case_name',
                         'project_name',
                         'details']
-    mandatory_fields_to_modify = {'creation_date': _fix_date}
+    mandatory_fields_to_modify = {'start_date': _fix_date}
+    fields_to_swap_or_add = {'scenario': 'version'}
     if '_id' in testcase:
         mongo_id = testcase['_id']
     else:
@@ -320,6 +94,15 @@ def verify_mongo_entry(testcase):
             else:
                 testcase[key] = mandatory_fields_to_modify[key](value)
                 del mandatory_fields_to_modify[key]
+        elif key in fields_to_swap_or_add:
+            if value is None:
+                swapped_key = fields_to_swap_or_add[key]
+                swapped_value = testcase[swapped_key]
+                logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
+                testcase[key] = swapped_value
+                del fields_to_swap_or_add[key]
+            else:
+                del fields_to_swap_or_add[key]
         elif key in optional_fields:
             if value is None:
                 # empty optional field, remove
@@ -334,126 +117,83 @@ def verify_mongo_entry(testcase):
         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
         return False
+    elif len(mandatory_fields_to_modify) > 0:
+        # some mandatory fields are missing
+        logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+                    " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
+        return False
     else:
+        if len(fields_to_swap_or_add) > 0:
+            for key, swap_key in fields_to_swap_or_add.iteritems():
+                testcase[key] = testcase[swap_key]
+
         return True
 
 
-def modify_mongo_entry(testcase):
+def format_document(testcase):
     # 1. verify and identify the testcase
     # 2. if modification is implemented, then use that
     # 3. if not, try to use default
     # 4. if 2 or 3 is successful, return True, otherwise return False
-    if verify_mongo_entry(testcase):
+    if verify_document(testcase):
         project = testcase['project_name']
         case_name = testcase['case_name']
-        if project == 'functest':
-            if case_name == 'Rally':
-                return modify_functest_rally(testcase)
-            elif case_name == 'ODL':
-                return modify_functest_odl(testcase)
-            elif case_name == 'ONOS':
-                return modify_functest_onos(testcase)
-            elif case_name == 'vIMS':
-                return modify_functest_vims(testcase)
-            elif case_name == 'Tempest':
-                return modify_functest_tempest(testcase)
-        return modify_default_entry(testcase)
+        fmt = conf_utils.get_format(project, case_name)
+        if fmt:
+            try:
+                logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
+                return vars(mongo2elastic_format)[fmt](testcase)
+            except Exception:
+                logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
+                return False
     else:
         return False
 
 
-def publish_mongo_data(output_destination):
-    tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
-    try:
-        subprocess.check_call(['mongoexport', '--db', 'test_results_collection', '-c', 'results', '--out',
-                               tmp_filename])
-        with open(tmp_filename) as fobj:
-            for mongo_json_line in fobj:
-                test_result = json.loads(mongo_json_line)
-                if modify_mongo_entry(test_result):
-                    shared_utils.publish_json(test_result, es_user, es_passwd, output_destination)
-    finally:
-        if os.path.exists(tmp_filename):
-            os.remove(tmp_filename)
-
+def export_documents(days):
+    cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
+    if days > 0:
+        past_time = datetime.datetime.today() - datetime.timedelta(days=days)
+        cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
+    cmd += [ '--out', '{}'.format(tmp_docs_file)]
 
-def get_mongo_data(days):
-    past_time = datetime.datetime.today() - datetime.timedelta(days=days)
-    mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
-                                                '--query', '{{"creation_date":{{$gt:"{}"}}}}'
-                                               .format(past_time)]).splitlines()
-
-    mongo_data = []
-    for mongo_json_line in mongo_json_lines:
-        test_result = json.loads(mongo_json_line)
-        if modify_mongo_entry(test_result):
-            # if the modification could be applied, append the modified result
-            mongo_data.append(test_result)
-    return mongo_data
+    try:
+        subprocess.check_call(cmd)
+    except Exception, err:
+        logger.error("export mongodb failed: %s" % err)
+        exit(-1)
 
 
-def publish_difference(mongo_data, elastic_data, output_destination, es_user, es_passwd):
-    for elastic_entry in elastic_data:
-        if elastic_entry in mongo_data:
-            mongo_data.remove(elastic_entry)
+def publish_document(document, es_creds, to):
+    status, data = shared_utils.publish_json(document, es_creds, to)
+    if status > 300:
+        logger.error('Publish record[{}] failed, due to [{}]'
+                    .format(document, json.loads(data)['error']['reason']))
 
-    logger.info('number of parsed test results: {}'.format(len(mongo_data)))
 
-    for parsed_test_result in mongo_data:
-        shared_utils.publish_json(parsed_test_result, es_user, es_passwd, output_destination)
+def publish_nonexist_documents(elastic_docs, es_creds, to):
+    try:
+        with open(tmp_docs_file) as fdocs:
+            for doc_line in fdocs:
+                doc = json.loads(doc_line)
+                if format_document(doc) and doc not in elastic_docs:
+                    publish_document(doc, es_creds, to)
+    finally:
+        fdocs.close()
+        if os.path.exists(tmp_docs_file):
+            os.remove(tmp_docs_file)
 
 
 if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
-    parser.add_argument('-od', '--output-destination',
-                        default='elasticsearch',
-                        choices=('elasticsearch', 'stdout'),
-                        help='defaults to elasticsearch')
-
-    parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
-                        help='get entries old at most N days from mongodb and'
-                             ' parse those that are not already in elasticsearch.'
-                             ' If not present, will get everything from mongodb, which is the default')
-
-    parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
-                        help='the url of elasticsearch, defaults to http://localhost:9200')
-
-    parser.add_argument('-u', '--elasticsearch-username',
-                        help='the username for elasticsearch')
-
-    parser.add_argument('-p', '--elasticsearch-password',
-                        help='the password for elasticsearch')
-
-    parser.add_argument('-m', '--mongodb-url', default='http://localhost:8082',
-                        help='the url of mongodb, defaults to http://localhost:8082')
-
-    args = parser.parse_args()
-    base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/results/mongo2elastic')
-    output_destination = args.output_destination
+    base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
+    to = args.output_destination
     days = args.merge_latest
-    es_user = args.elasticsearch_username
-    es_passwd = args.elasticsearch_password
+    es_creds = args.elasticsearch_username
 
-    if output_destination == 'elasticsearch':
-        output_destination = base_elastic_url
+    if to == 'elasticsearch':
+        to = base_elastic_url
 
-    # parsed_test_results will be printed/sent to elasticsearch
-    if days == 0:
-        # TODO get everything from mongo
-        publish_mongo_data(output_destination)
-    elif days > 0:
-        body = '''{{
-    "query" : {{
-        "range" : {{
-            "creation_date" : {{
-                "gte" : "now-{}d"
-            }}
-        }}
-    }}
-}}'''.format(days)
-        elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_user, es_passwd, body)
-        logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
-        mongo_data = get_mongo_data(days)
-        publish_difference(mongo_data, elastic_data, output_destination, es_user, es_passwd)
-    else:
-        raise Exception('Update must be non-negative')
+    export_documents(days)
+    elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
+    logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
+    publish_nonexist_documents(elastic_docs, es_creds, to)