#! /usr/bin/env python
-import logging
-import argparse
-import shared_utils
+
+import datetime
import json
-import urlparse
-import uuid
import os
import subprocess
-import datetime
+import traceback
+import urlparse
+import uuid
-logger = logging.getLogger('mongo_to_elasticsearch')
-logger.setLevel(logging.DEBUG)
-file_handler = logging.FileHandler('/var/log/{}.log'.format('mongo_to_elasticsearch'))
-file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
-logger.addHandler(file_handler)
-
-
-def _get_dicts_from_list(testcase, dict_list, keys):
- dicts = []
- for dictionary in dict_list:
- # iterate over dictionaries in input list
- if not isinstance(dictionary, dict):
- logger.info("Skipping non-dict details testcase '{}'".format(testcase))
- continue
- if keys == set(dictionary.keys()):
- # check the dictionary structure
- dicts.append(dictionary)
- return dicts
-
-
-def _get_results_from_list_of_dicts(list_of_dict_statuses, dict_indexes, expected_results=None):
- test_results = {}
- for test_status in list_of_dict_statuses:
- status = test_status
- for index in dict_indexes:
- status = status[index]
- if status in test_results:
- test_results[status] += 1
- else:
- test_results[status] = 1
+import argparse
- if expected_results is not None:
- for expected_result in expected_results:
- if expected_result not in test_results:
- test_results[expected_result] = 0
+import conf_utils
+import logger_utils
+import mongo2elastic_format
+import shared_utils
- return test_results
+logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
+parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
+parser.add_argument('-od', '--output-destination',
+ default='elasticsearch',
+ choices=('elasticsearch', 'stdout'),
+ help='defaults to elasticsearch')
-def _convert_value(value):
- return value if value != '' else 0
+parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
+ help='get entries old at most N days from mongodb and'
+ ' parse those that are not already in elasticsearch.'
+ ' If not present, will get everything from mongodb, which is the default')
+parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
+ help='the url of elasticsearch, defaults to http://localhost:9200')
-def _convert_duration(duration):
- if (isinstance(duration, str) or isinstance(duration, unicode)) and ':' in duration:
- hours, minutes, seconds = duration.split(":")
- hours = _convert_value(hours)
- minutes = _convert_value(minutes)
- seconds = _convert_value(seconds)
- int_duration = 3600 * int(hours) + 60 * int(minutes) + float(seconds)
- else:
- int_duration = duration
- return int_duration
+parser.add_argument('-u', '--elasticsearch-username', default=None,
+ help='The username with password for elasticsearch in format username:password')
+args = parser.parse_args()
-def modify_functest_tempest(testcase):
- if modify_default_entry(testcase):
- testcase_details = testcase['details']
- testcase_tests = float(testcase_details['tests'])
- testcase_failures = float(testcase_details['failures'])
- if testcase_tests != 0:
- testcase_details['success_percentage'] = 100 * (testcase_tests - testcase_failures) / testcase_tests
- else:
- testcase_details['success_percentage'] = 0
- return True
- else:
- return False
-
-
-def modify_functest_vims(testcase):
- """
- Structure:
- details.sig_test.result.[{result}]
- details.sig_test.duration
- details.vIMS.duration
- details.orchestrator.duration
-
- Find data for these fields
- -> details.sig_test.duration
- -> details.sig_test.tests
- -> details.sig_test.failures
- -> details.sig_test.passed
- -> details.sig_test.skipped
- -> details.vIMS.duration
- -> details.orchestrator.duration
- """
- testcase_details = testcase['details']
- sig_test_results = _get_dicts_from_list(testcase, testcase_details['sig_test']['result'],
- {'duration', 'result', 'name', 'error'})
- if len(sig_test_results) < 1:
- logger.info("No 'result' from 'sig_test' found in vIMS details, skipping")
- return False
- else:
- test_results = _get_results_from_list_of_dicts(sig_test_results, ('result',), ('Passed', 'Skipped', 'Failed'))
- passed = test_results['Passed']
- skipped = test_results['Skipped']
- failures = test_results['Failed']
- all_tests = passed + skipped + failures
- testcase['details'] = {
- 'sig_test': {
- 'duration': testcase_details['sig_test']['duration'],
- 'tests': all_tests,
- 'failures': failures,
- 'passed': passed,
- 'skipped': skipped
- },
- 'vIMS': {
- 'duration': testcase_details['vIMS']['duration']
- },
- 'orchestrator': {
- 'duration': testcase_details['orchestrator']['duration']
- }
- }
- return True
-
-
-def modify_functest_onos(testcase):
- """
- Structure:
- details.FUNCvirNet.duration
- details.FUNCvirNet.status.[{Case result}]
- details.FUNCvirNetL3.duration
- details.FUNCvirNetL3.status.[{Case result}]
-
- Find data for these fields
- -> details.FUNCvirNet.duration
- -> details.FUNCvirNet.tests
- -> details.FUNCvirNet.failures
- -> details.FUNCvirNetL3.duration
- -> details.FUNCvirNetL3.tests
- -> details.FUNCvirNetL3.failures
- """
- testcase_details = testcase['details']
-
- if 'FUNCvirNet' not in testcase_details:
- return modify_default_entry(testcase)
-
- funcvirnet_details = testcase_details['FUNCvirNet']['status']
- funcvirnet_statuses = _get_dicts_from_list(testcase, funcvirnet_details, {'Case result', 'Case name:'})
-
- funcvirnetl3_details = testcase_details['FUNCvirNetL3']['status']
- funcvirnetl3_statuses = _get_dicts_from_list(testcase, funcvirnetl3_details, {'Case result', 'Case name:'})
-
- if len(funcvirnet_statuses) < 0:
- logger.info("No results found in 'FUNCvirNet' part of ONOS results")
- return False
- elif len(funcvirnetl3_statuses) < 0:
- logger.info("No results found in 'FUNCvirNetL3' part of ONOS results")
- return False
- else:
- funcvirnet_results = _get_results_from_list_of_dicts(funcvirnet_statuses,
- ('Case result',), ('PASS', 'FAIL'))
- funcvirnetl3_results = _get_results_from_list_of_dicts(funcvirnetl3_statuses,
- ('Case result',), ('PASS', 'FAIL'))
-
- funcvirnet_passed = funcvirnet_results['PASS']
- funcvirnet_failed = funcvirnet_results['FAIL']
- funcvirnet_all = funcvirnet_passed + funcvirnet_failed
-
- funcvirnetl3_passed = funcvirnetl3_results['PASS']
- funcvirnetl3_failed = funcvirnetl3_results['FAIL']
- funcvirnetl3_all = funcvirnetl3_passed + funcvirnetl3_failed
-
- testcase_details['FUNCvirNet'] = {
- 'duration': _convert_duration(testcase_details['FUNCvirNet']['duration']),
- 'tests': funcvirnet_all,
- 'failures': funcvirnet_failed
- }
-
- testcase_details['FUNCvirNetL3'] = {
- 'duration': _convert_duration(testcase_details['FUNCvirNetL3']['duration']),
- 'tests': funcvirnetl3_all,
- 'failures': funcvirnetl3_failed
- }
-
- return True
-
-
-def modify_functest_rally(testcase):
- """
- Structure:
- details.[{summary.duration}]
- details.[{summary.nb success}]
- details.[{summary.nb tests}]
-
- Find data for these fields
- -> details.duration
- -> details.tests
- -> details.success_percentage
- """
- summaries = _get_dicts_from_list(testcase, testcase['details'], {'summary'})
-
- if len(summaries) != 1:
- logger.info("Found zero or more than one 'summaries' in Rally details, skipping")
- return False
- else:
- summary = summaries[0]['summary']
- testcase['details'] = {
- 'duration': summary['duration'],
- 'tests': summary['nb tests'],
- 'success_percentage': summary['nb success']
- }
- return True
-
-
-def modify_functest_odl(testcase):
- """
- Structure:
- details.details.[{test_status.@status}]
-
- Find data for these fields
- -> details.tests
- -> details.failures
- -> details.success_percentage?
- """
- test_statuses = _get_dicts_from_list(testcase, testcase['details']['details'],
- {'test_status', 'test_doc', 'test_name'})
- if len(test_statuses) < 1:
- logger.info("No 'test_status' found in ODL details, skipping")
- return False
- else:
- test_results = _get_results_from_list_of_dicts(test_statuses, ('test_status', '@status'), ('PASS', 'FAIL'))
-
- passed_tests = test_results['PASS']
- failed_tests = test_results['FAIL']
- all_tests = passed_tests + failed_tests
-
- testcase['details'] = {
- 'tests': all_tests,
- 'failures': failed_tests,
- 'success_percentage': 100 * passed_tests / float(all_tests)
- }
- logger.debug("Modified odl testcase: '{}'".format(json.dumps(testcase, indent=2)))
- return True
-
-
-def modify_default_entry(testcase):
- """
- Look for these and leave any of those:
- details.duration
- details.tests
- details.failures
-
- If none are present, then return False
- """
- found = False
- testcase_details = testcase['details']
- fields = ['duration', 'tests', 'failures']
- if isinstance(testcase_details, dict):
- for key, value in testcase_details.items():
- if key in fields:
- found = True
- if key == 'duration':
- testcase_details[key] = _convert_duration(value)
- else:
- del testcase_details[key]
-
- return found
-
-
-def _fix_date(date_string):
- if isinstance(date_string, dict):
- return date_string['$date']
- else:
- return date_string[:-3].replace(' ', 'T') + 'Z'
-
-
-def verify_mongo_entry(testcase):
- """
- Mandatory fields:
- installer
- pod_name
- version
- case_name
- date
- project
- details
-
- these fields must be present and must NOT be None
-
- Optional fields:
- description
-
- these fields will be preserved if the are NOT None
- """
- mandatory_fields = ['installer',
- 'pod_name',
- 'version',
- 'case_name',
- 'project_name',
- 'details']
- mandatory_fields_to_modify = {'start_date': _fix_date}
- fields_to_swap_or_add = {'scenario': 'version'}
- if '_id' in testcase:
- mongo_id = testcase['_id']
- else:
- mongo_id = None
- optional_fields = ['description']
- for key, value in testcase.items():
- if key in mandatory_fields:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
- else:
- mandatory_fields.remove(key)
- elif key in mandatory_fields_to_modify:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
- else:
- testcase[key] = mandatory_fields_to_modify[key](value)
- del mandatory_fields_to_modify[key]
- elif key in fields_to_swap_or_add:
- if value is None:
- swapped_key = fields_to_swap_or_add[key]
- swapped_value = testcase[swapped_key]
- logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
- testcase[key] = swapped_value
- del fields_to_swap_or_add[key]
- else:
- del fields_to_swap_or_add[key]
- elif key in optional_fields:
- if value is None:
- # empty optional field, remove
- del testcase[key]
- optional_fields.remove(key)
- else:
- # unknown field
- del testcase[key]
-
- if len(mandatory_fields) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
- return False
- elif len(mandatory_fields_to_modify) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
- return False
- else:
- if len(fields_to_swap_or_add) > 0:
- for key, swap_key in fields_to_swap_or_add.iteritems():
- testcase[key] = testcase[swap_key]
-
- return True
-
-
-def modify_mongo_entry(testcase):
- # 1. verify and identify the testcase
- # 2. if modification is implemented, then use that
- # 3. if not, try to use default
- # 4. if 2 or 3 is successful, return True, otherwise return False
- if verify_mongo_entry(testcase):
- project = testcase['project_name']
- case_name = testcase['case_name']
- logger.info("Processing mongo test case '{}'".format(case_name))
- if project == 'functest':
- if case_name == 'rally_sanity':
- return modify_functest_rally(testcase)
- elif case_name.lower() == 'odl':
- return modify_functest_odl(testcase)
- elif case_name.lower() == 'onos':
- return modify_functest_onos(testcase)
- elif case_name.lower() == 'vims':
- return modify_functest_vims(testcase)
- elif case_name == 'tempest_smoke_serial':
- return modify_functest_tempest(testcase)
- return modify_default_entry(testcase)
- else:
- return False
-
-
-def publish_mongo_data(output_destination):
- tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
- try:
- subprocess.check_call(['mongoexport', '--db', 'test_results_collection', '-c', 'results', '--out',
- tmp_filename])
- with open(tmp_filename) as fobj:
- for mongo_json_line in fobj:
- test_result = json.loads(mongo_json_line)
- if modify_mongo_entry(test_result):
- shared_utils.publish_json(test_result, es_creds, output_destination)
- finally:
- if os.path.exists(tmp_filename):
- os.remove(tmp_filename)
-
-
-def get_mongo_data(days):
- past_time = datetime.datetime.today() - datetime.timedelta(days=days)
- mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
- '--query', '{{"start_date":{{$gt:"{}"}}}}'
- .format(past_time)]).splitlines()
-
- mongo_data = []
- for mongo_json_line in mongo_json_lines:
- test_result = json.loads(mongo_json_line)
- if modify_mongo_entry(test_result):
- # if the modification could be applied, append the modified result
- mongo_data.append(test_result)
- return mongo_data
-
-
-def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
- for elastic_entry in elastic_data:
- if elastic_entry in mongo_data:
- mongo_data.remove(elastic_entry)
-
- logger.info('number of parsed test results: {}'.format(len(mongo_data)))
-
- for parsed_test_result in mongo_data:
- shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
+tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
- parser.add_argument('-od', '--output-destination',
- default='elasticsearch',
- choices=('elasticsearch', 'stdout'),
- help='defaults to elasticsearch')
+class DocumentPublisher:
- parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
- help='get entries old at most N days from mongodb and'
- ' parse those that are not already in elasticsearch.'
- ' If not present, will get everything from mongodb, which is the default')
+ def __init__(self, doc, fmt, exist_docs, creds, to):
+ self.doc = doc
+ self.fmt = fmt
+ self.creds = creds
+ self.exist_docs = exist_docs
+ self.to = to
+ self.is_formatted = True
- parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
- help='the url of elasticsearch, defaults to http://localhost:9200')
+ def format(self):
+ try:
+ if self._verify_document() and self.fmt:
+ self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
+ else:
+ self.is_formatted = False
+ except Exception:
+ logger.error("Fail in format testcase[%s]\nerror message: %s" %
+ (self.doc, traceback.format_exc()))
+ self.is_formatted = False
+ finally:
+ return self
+
+ def publish(self):
+ if self.is_formatted and self.doc not in self.exist_docs:
+ self._publish()
+
+ def _publish(self):
+ status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
+ if status > 300:
+ logger.error('Publish record[{}] failed, due to [{}]'
+ .format(self.doc, json.loads(data)['error']['reason']))
+
+ def _fix_date(self, date_string):
+ if isinstance(date_string, dict):
+ return date_string['$date']
+ else:
+ return date_string[:-3].replace(' ', 'T') + 'Z'
+
+ def _verify_document(self):
+ """
+ Mandatory fields:
+ installer
+ pod_name
+ version
+ case_name
+ date
+ project
+ details
+
+ these fields must be present and must NOT be None
+
+ Optional fields:
+ description
+
+ these fields will be preserved if the are NOT None
+ """
+ mandatory_fields = ['installer',
+ 'pod_name',
+ 'version',
+ 'case_name',
+ 'project_name',
+ 'details']
+ mandatory_fields_to_modify = {'start_date': self._fix_date}
+ fields_to_swap_or_add = {'scenario': 'version'}
+ if '_id' in self.doc:
+ mongo_id = self.doc['_id']
+ else:
+ mongo_id = None
+ optional_fields = ['description']
+ for key, value in self.doc.items():
+ if key in mandatory_fields:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ mandatory_fields.remove(key)
+ elif key in mandatory_fields_to_modify:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ self.doc[key] = mandatory_fields_to_modify[key](value)
+ del mandatory_fields_to_modify[key]
+ elif key in fields_to_swap_or_add:
+ if value is None:
+ swapped_key = fields_to_swap_or_add[key]
+ swapped_value = self.doc[swapped_key]
+ logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
+ swapped_value))
+ self.doc[key] = swapped_value
+ del fields_to_swap_or_add[key]
+ else:
+ del fields_to_swap_or_add[key]
+ elif key in optional_fields:
+ if value is None:
+ # empty optional field, remove
+ del self.doc[key]
+ optional_fields.remove(key)
+ else:
+ # unknown field
+ del self.doc[key]
+
+ if len(mandatory_fields) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
+ return False
+ elif len(mandatory_fields_to_modify) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
+ return False
+ else:
+ if len(fields_to_swap_or_add) > 0:
+ for key, swap_key in fields_to_swap_or_add.iteritems():
+ self.doc[key] = self.doc[swap_key]
+
+ return True
+
+
+class DocumentsPublisher:
+
+ def __init__(self, project, case, fmt, days, elastic_url, creds, to):
+ self.project = project
+ self.case = case
+ self.fmt = fmt
+ self.days = days
+ self.elastic_url = elastic_url
+ self.creds = creds
+ self.to = to
+ self.existed_docs = []
+
+ def export(self):
+ if days > 0:
+ past_time = datetime.datetime.today() - datetime.timedelta(days=days)
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}",
+ "start_date": {{"$gt" : "{}"}}
+ }}'''.format(self.project, self.case, past_time)
+ else:
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}"
+ }}'''.format(self.project, self.case)
+ cmd = ['mongoexport',
+ '--db', 'test_results_collection',
+ '--collection', 'results',
+ '--query', '{}'.format(query),
+ '--out', '{}'.format(tmp_docs_file)]
+ try:
+ subprocess.check_call(cmd)
+ return self
+ except Exception, err:
+ logger.error("export mongodb failed: %s" % err)
+ self._remove()
+ exit(-1)
+
+ def get_existed_docs(self):
+ self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, days)
+ return self
+
+ def publish(self):
+ try:
+ with open(tmp_docs_file) as fdocs:
+ for doc_line in fdocs:
+ DocumentPublisher(json.loads(doc_line),
+ self.fmt,
+ self.existed_docs,
+ self.creds,
+ self.to).format().publish()
+ finally:
+ fdocs.close()
+ self._remove()
+
+ def _remove(self):
+ if os.path.exists(tmp_docs_file):
+ os.remove(tmp_docs_file)
- parser.add_argument('-u', '--elasticsearch-username', default=None,
- help='The username with password for elasticsearch in format username:password')
- args = parser.parse_args()
+if __name__ == '__main__':
base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
- output_destination = args.output_destination
+ to = args.output_destination
days = args.merge_latest
es_creds = args.elasticsearch_username
- if output_destination == 'elasticsearch':
- output_destination = base_elastic_url
-
- # parsed_test_results will be printed/sent to elasticsearch
- if days == 0:
- publish_mongo_data(output_destination)
- elif days > 0:
- body = '''{{
- "query" : {{
- "range" : {{
- "start_date" : {{
- "gte" : "now-{}d"
- }}
- }}
- }}
-}}'''.format(days)
- elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
- logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
- mongo_data = get_mongo_data(days)
- publish_difference(mongo_data, elastic_data, output_destination, es_creds)
- else:
- raise Exception('Update must be non-negative')
-
+ if to == 'elasticsearch':
+ to = base_elastic_url
+
+ for project, case_dicts in conf_utils.testcases_yaml.items():
+ for case_dict in case_dicts:
+ case = case_dict.get('name')
+ fmt = conf_utils.compose_format(case_dict.get('format'))
+ DocumentsPublisher(project,
+ case,
+ fmt,
+ days,
+ base_elastic_url,
+ es_creds,
+ to).export().get_existed_docs().publish()