merge N days and whole database process and not publish existed data any way
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
index ea515bc..b722793 100644 (file)
@@ -16,6 +16,27 @@ import shared_utils
 
 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
 
+parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
+parser.add_argument('-od', '--output-destination',
+                    default='elasticsearch',
+                    choices=('elasticsearch', 'stdout'),
+                    help='defaults to elasticsearch')
+
+parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
+                    help='get entries old at most N days from mongodb and'
+                         ' parse those that are not already in elasticsearch.'
+                         ' If not present, will get everything from mongodb, which is the default')
+
+parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
+                    help='the url of elasticsearch, defaults to http://localhost:9200')
+
+parser.add_argument('-u', '--elasticsearch-username', default=None,
+                    help='The username with password for elasticsearch in format username:password')
+
+args = parser.parse_args()
+
+tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
+
 
 def _fix_date(date_string):
     if isinstance(date_string, dict):
@@ -24,7 +45,7 @@ def _fix_date(date_string):
         return date_string[:-3].replace(' ', 'T') + 'Z'
 
 
-def verify_mongo_entry(testcase):
+def verify_document(testcase):
     """
     Mandatory fields:
         installer
@@ -109,12 +130,12 @@ def verify_mongo_entry(testcase):
         return True
 
 
-def modify_mongo_entry(testcase):
+def format_document(testcase):
     # 1. verify and identify the testcase
     # 2. if modification is implemented, then use that
     # 3. if not, try to use default
     # 4. if 2 or 3 is successful, return True, otherwise return False
-    if verify_mongo_entry(testcase):
+    if verify_document(testcase):
         project = testcase['project_name']
         case_name = testcase['case_name']
         fmt = conf_utils.get_format(project, case_name)
@@ -123,103 +144,56 @@ def modify_mongo_entry(testcase):
                 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
                 return vars(mongo2elastic_format)[fmt](testcase)
             except Exception:
-                logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
+                logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
+                return False
     else:
         return False
 
 
-def publish_mongo_data(output_destination):
-    tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
-    try:
-        subprocess.check_call(['mongoexport',
-                               '--db', 'test_results_collection',
-                               '-c', 'results',
-                               '--out', tmp_filename])
-        with open(tmp_filename) as fobj:
-            for mongo_json_line in fobj:
-                test_result = json.loads(mongo_json_line)
-                if modify_mongo_entry(test_result):
-                    status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
-                    if status > 300:
-                        project = test_result['project_name']
-                        case_name = test_result['case_name']
-                        logger.info('project {} case {} publish failed, due to [{}]'
-                                    .format(project, case_name, json.loads(data)['error']['reason']))
-    finally:
-        if os.path.exists(tmp_filename):
-            os.remove(tmp_filename)
-
+def export_documents(days):
+    cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
+    if days > 0:
+        past_time = datetime.datetime.today() - datetime.timedelta(days=days)
+        cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
+    cmd += [ '--out', '{}'.format(tmp_docs_file)]
 
-def get_mongo_data(days):
-    past_time = datetime.datetime.today() - datetime.timedelta(days=days)
-    mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
-                                                '--query', '{{"start_date":{{$gt:"{}"}}}}'
-                                               .format(past_time)]).splitlines()
-
-    mongo_data = []
-    for mongo_json_line in mongo_json_lines:
-        test_result = json.loads(mongo_json_line)
-        if modify_mongo_entry(test_result):
-            # if the modification could be applied, append the modified result
-            mongo_data.append(test_result)
-    return mongo_data
+    try:
+        subprocess.check_call(cmd)
+    except Exception, err:
+        logger.error("export mongodb failed: %s" % err)
+        exit(-1)
 
 
-def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
-    for elastic_entry in elastic_data:
-        if elastic_entry in mongo_data:
-            mongo_data.remove(elastic_entry)
+def publish_document(document, es_creds, to):
+    status, data = shared_utils.publish_json(document, es_creds, to)
+    if status > 300:
+        logger.error('Publish record[{}] failed, due to [{}]'
+                    .format(document, json.loads(data)['error']['reason']))
 
-    logger.info('number of parsed test results: {}'.format(len(mongo_data)))
 
-    for parsed_test_result in mongo_data:
-        shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
+def publish_nonexist_documents(elastic_docs, es_creds, to):
+    try:
+        with open(tmp_docs_file) as fdocs:
+            for doc_line in fdocs:
+                doc = json.loads(doc_line)
+                if format_document(doc) and doc not in elastic_docs:
+                    publish_document(doc, es_creds, to)
+    finally:
+        fdocs.close()
+        if os.path.exists(tmp_docs_file):
+            os.remove(tmp_docs_file)
 
 
 if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
-    parser.add_argument('-od', '--output-destination',
-                        default='elasticsearch',
-                        choices=('elasticsearch', 'stdout'),
-                        help='defaults to elasticsearch')
-
-    parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
-                        help='get entries old at most N days from mongodb and'
-                             ' parse those that are not already in elasticsearch.'
-                             ' If not present, will get everything from mongodb, which is the default')
-
-    parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
-                        help='the url of elasticsearch, defaults to http://localhost:9200')
-
-    parser.add_argument('-u', '--elasticsearch-username', default=None,
-                        help='The username with password for elasticsearch in format username:password')
-
-    args = parser.parse_args()
     base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
-    output_destination = args.output_destination
+    to = args.output_destination
     days = args.merge_latest
     es_creds = args.elasticsearch_username
 
-    if output_destination == 'elasticsearch':
-        output_destination = base_elastic_url
-
-    # parsed_test_results will be printed/sent to elasticsearch
-    if days == 0:
-        publish_mongo_data(output_destination)
-    elif days > 0:
-        body = '''{{
-    "query" : {{
-        "range" : {{
-            "start_date" : {{
-                "gte" : "now-{}d"
-            }}
-        }}
-    }}
-}}'''.format(days)
-        elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
-        logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
-        mongo_data = get_mongo_data(days)
-        publish_difference(mongo_data, elastic_data, output_destination, es_creds)
-    else:
-        raise Exception('Update must be non-negative')
+    if to == 'elasticsearch':
+        to = base_elastic_url
 
+    export_documents(days)
+    elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
+    logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
+    publish_nonexist_documents(elastic_docs, es_creds, to)