15 logger = logging.getLogger('mongo_to_elasticsearch')
16 logger.setLevel(logging.DEBUG)
17 file_handler = logging.FileHandler('/var/log/{}.log'.format('mongo_to_elasticsearch'))
18 file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
19 logger.addHandler(file_handler)
22 def _get_dicts_from_list(testcase, dict_list, keys):
24 for dictionary in dict_list:
25 # iterate over dictionaries in input list
26 if not isinstance(dictionary, dict):
27 logger.info("Skipping non-dict details testcase '{}'".format(testcase))
29 if keys == set(dictionary.keys()):
30 # check the dictionary structure
31 dicts.append(dictionary)
35 def _get_results_from_list_of_dicts(list_of_dict_statuses, dict_indexes, expected_results=None):
37 for test_status in list_of_dict_statuses:
39 for index in dict_indexes:
40 status = status[index]
41 if status in test_results:
42 test_results[status] += 1
44 test_results[status] = 1
46 if expected_results is not None:
47 for expected_result in expected_results:
48 if expected_result not in test_results:
49 test_results[expected_result] = 0
54 def _convert_value(value):
55 return value if value != '' else 0
58 def _convert_duration(duration):
59 if (isinstance(duration, str) or isinstance(duration, unicode)) and ':' in duration:
60 hours, minutes, seconds = duration.split(":")
61 hours = _convert_value(hours)
62 minutes = _convert_value(minutes)
63 seconds = _convert_value(seconds)
64 int_duration = 3600 * int(hours) + 60 * int(minutes) + float(seconds)
66 int_duration = duration
70 def modify_functest_tempest(testcase):
71 if modify_default_entry(testcase):
72 testcase_details = testcase['details']
73 testcase_tests = float(testcase_details['tests'])
74 testcase_failures = float(testcase_details['failures'])
75 if testcase_tests != 0:
76 testcase_details['success_percentage'] = 100 * (testcase_tests - testcase_failures) / testcase_tests
78 testcase_details['success_percentage'] = 0
84 def modify_functest_vims(testcase):
87 details.sig_test.result.[{result}]
88 details.sig_test.duration
90 details.orchestrator.duration
92 Find data for these fields
93 -> details.sig_test.duration
94 -> details.sig_test.tests
95 -> details.sig_test.failures
96 -> details.sig_test.passed
97 -> details.sig_test.skipped
98 -> details.vIMS.duration
99 -> details.orchestrator.duration
101 testcase_details = testcase['details']
102 sig_test_results = _get_dicts_from_list(testcase, testcase_details['sig_test']['result'],
103 {'duration', 'result', 'name', 'error'})
104 if len(sig_test_results) < 1:
105 logger.info("No 'result' from 'sig_test' found in vIMS details, skipping")
108 test_results = _get_results_from_list_of_dicts(sig_test_results, ('result',), ('Passed', 'Skipped', 'Failed'))
109 passed = test_results['Passed']
110 skipped = test_results['Skipped']
111 failures = test_results['Failed']
112 all_tests = passed + skipped + failures
113 testcase['details'] = {
115 'duration': testcase_details['sig_test']['duration'],
117 'failures': failures,
122 'duration': testcase_details['vIMS']['duration']
125 'duration': testcase_details['orchestrator']['duration']
131 def modify_functest_onos(testcase):
134 details.FUNCvirNet.duration
135 details.FUNCvirNet.status.[{Case result}]
136 details.FUNCvirNetL3.duration
137 details.FUNCvirNetL3.status.[{Case result}]
139 Find data for these fields
140 -> details.FUNCvirNet.duration
141 -> details.FUNCvirNet.tests
142 -> details.FUNCvirNet.failures
143 -> details.FUNCvirNetL3.duration
144 -> details.FUNCvirNetL3.tests
145 -> details.FUNCvirNetL3.failures
147 testcase_details = testcase['details']
149 if 'FUNCvirNet' not in testcase_details:
150 return modify_default_entry(testcase)
152 funcvirnet_details = testcase_details['FUNCvirNet']['status']
153 funcvirnet_statuses = _get_dicts_from_list(testcase, funcvirnet_details, {'Case result', 'Case name:'})
155 funcvirnetl3_details = testcase_details['FUNCvirNetL3']['status']
156 funcvirnetl3_statuses = _get_dicts_from_list(testcase, funcvirnetl3_details, {'Case result', 'Case name:'})
158 if len(funcvirnet_statuses) < 0:
159 logger.info("No results found in 'FUNCvirNet' part of ONOS results")
161 elif len(funcvirnetl3_statuses) < 0:
162 logger.info("No results found in 'FUNCvirNetL3' part of ONOS results")
165 funcvirnet_results = _get_results_from_list_of_dicts(funcvirnet_statuses,
166 ('Case result',), ('PASS', 'FAIL'))
167 funcvirnetl3_results = _get_results_from_list_of_dicts(funcvirnetl3_statuses,
168 ('Case result',), ('PASS', 'FAIL'))
170 funcvirnet_passed = funcvirnet_results['PASS']
171 funcvirnet_failed = funcvirnet_results['FAIL']
172 funcvirnet_all = funcvirnet_passed + funcvirnet_failed
174 funcvirnetl3_passed = funcvirnetl3_results['PASS']
175 funcvirnetl3_failed = funcvirnetl3_results['FAIL']
176 funcvirnetl3_all = funcvirnetl3_passed + funcvirnetl3_failed
178 testcase_details['FUNCvirNet'] = {
179 'duration': _convert_duration(testcase_details['FUNCvirNet']['duration']),
180 'tests': funcvirnet_all,
181 'failures': funcvirnet_failed
184 testcase_details['FUNCvirNetL3'] = {
185 'duration': _convert_duration(testcase_details['FUNCvirNetL3']['duration']),
186 'tests': funcvirnetl3_all,
187 'failures': funcvirnetl3_failed
193 def modify_functest_rally(testcase):
196 details.[{summary.duration}]
197 details.[{summary.nb success}]
198 details.[{summary.nb tests}]
200 Find data for these fields
203 -> details.success_percentage
205 summaries = _get_dicts_from_list(testcase, testcase['details'], {'summary'})
207 if len(summaries) != 1:
208 logger.info("Found zero or more than one 'summaries' in Rally details, skipping")
211 summary = summaries[0]['summary']
212 testcase['details'] = {
213 'duration': summary['duration'],
214 'tests': summary['nb tests'],
215 'success_percentage': summary['nb success']
220 def modify_functest_odl(testcase):
223 details.details.[{test_status.@status}]
225 Find data for these fields
228 -> details.success_percentage?
230 test_statuses = _get_dicts_from_list(testcase, testcase['details']['details'],
231 {'test_status', 'test_doc', 'test_name'})
232 if len(test_statuses) < 1:
233 logger.info("No 'test_status' found in ODL details, skipping")
236 test_results = _get_results_from_list_of_dicts(test_statuses, ('test_status', '@status'), ('PASS', 'FAIL'))
238 passed_tests = test_results['PASS']
239 failed_tests = test_results['FAIL']
240 all_tests = passed_tests + failed_tests
242 testcase['details'] = {
244 'failures': failed_tests,
245 'success_percentage': 100 * passed_tests / float(all_tests)
247 logger.debug("Modified odl testcase: '{}'".format(json.dumps(testcase, indent=2)))
251 def modify_default_entry(testcase):
253 Look for these and leave any of those:
258 If none are present, then return False
261 testcase_details = testcase['details']
262 fields = ['duration', 'tests', 'failures']
263 if isinstance(testcase_details, dict):
264 for key, value in testcase_details.items():
267 if key == 'duration':
268 testcase_details[key] = _convert_duration(value)
270 del testcase_details[key]
275 def _fix_date(date_string):
276 if isinstance(date_string, dict):
277 return date_string['$date']
279 return date_string[:-3].replace(' ', 'T') + 'Z'
282 def verify_mongo_entry(testcase):
293 these fields must be present and must NOT be None
298 these fields will be preserved if the are NOT None
300 mandatory_fields = ['installer',
306 mandatory_fields_to_modify = {'start_date': _fix_date}
307 fields_to_swap_or_add = {'scenario': 'version'}
308 if '_id' in testcase:
309 mongo_id = testcase['_id']
312 optional_fields = ['description']
313 for key, value in testcase.items():
314 if key in mandatory_fields:
316 # empty mandatory field, invalid input
317 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
318 " for mandatory field '{}'".format(mongo_id, key))
321 mandatory_fields.remove(key)
322 elif key in mandatory_fields_to_modify:
324 # empty mandatory field, invalid input
325 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
326 " for mandatory field '{}'".format(mongo_id, key))
329 testcase[key] = mandatory_fields_to_modify[key](value)
330 del mandatory_fields_to_modify[key]
331 elif key in fields_to_swap_or_add:
333 swapped_key = fields_to_swap_or_add[key]
334 swapped_value = testcase[swapped_key]
335 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
336 testcase[key] = swapped_value
337 del fields_to_swap_or_add[key]
339 del fields_to_swap_or_add[key]
340 elif key in optional_fields:
342 # empty optional field, remove
344 optional_fields.remove(key)
349 if len(mandatory_fields) > 0:
350 # some mandatory fields are missing
351 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
352 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
354 elif len(mandatory_fields_to_modify) > 0:
355 # some mandatory fields are missing
356 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
357 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
360 if len(fields_to_swap_or_add) > 0:
361 for key, swap_key in fields_to_swap_or_add.iteritems():
362 testcase[key] = testcase[swap_key]
367 def modify_mongo_entry(testcase):
368 # 1. verify and identify the testcase
369 # 2. if modification is implemented, then use that
370 # 3. if not, try to use default
371 # 4. if 2 or 3 is successful, return True, otherwise return False
372 if verify_mongo_entry(testcase):
373 project = testcase['project_name']
374 case_name = testcase['case_name']
375 logger.info("Processing mongo test case '{}'".format(case_name))
377 if project == 'functest':
378 if case_name == 'rally_sanity':
379 return modify_functest_rally(testcase)
380 elif case_name.lower() == 'odl':
381 return modify_functest_odl(testcase)
382 elif case_name.lower() == 'onos':
383 return modify_functest_onos(testcase)
384 elif case_name.lower() == 'vims':
385 return modify_functest_vims(testcase)
386 elif case_name == 'tempest_smoke_serial':
387 return modify_functest_tempest(testcase)
388 return modify_default_entry(testcase)
390 logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
395 def publish_mongo_data(output_destination):
396 tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
398 subprocess.check_call(['mongoexport', '--db', 'test_results_collection', '-c', 'results', '--out',
400 with open(tmp_filename) as fobj:
401 for mongo_json_line in fobj:
402 test_result = json.loads(mongo_json_line)
403 if modify_mongo_entry(test_result):
404 status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
406 project = test_result['project_name']
407 case_name = test_result['case_name']
408 logger.info('project {} case {} publish failed, due to [{}]'
409 .format(project, case_name, json.loads(data)['error']['reason']))
411 if os.path.exists(tmp_filename):
412 os.remove(tmp_filename)
415 def get_mongo_data(days):
416 past_time = datetime.datetime.today() - datetime.timedelta(days=days)
417 mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
418 '--query', '{{"start_date":{{$gt:"{}"}}}}'
419 .format(past_time)]).splitlines()
422 for mongo_json_line in mongo_json_lines:
423 test_result = json.loads(mongo_json_line)
424 if modify_mongo_entry(test_result):
425 # if the modification could be applied, append the modified result
426 mongo_data.append(test_result)
430 def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
431 for elastic_entry in elastic_data:
432 if elastic_entry in mongo_data:
433 mongo_data.remove(elastic_entry)
435 logger.info('number of parsed test results: {}'.format(len(mongo_data)))
437 for parsed_test_result in mongo_data:
438 shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
441 if __name__ == '__main__':
442 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
443 parser.add_argument('-od', '--output-destination',
444 default='elasticsearch',
445 choices=('elasticsearch', 'stdout'),
446 help='defaults to elasticsearch')
448 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
449 help='get entries old at most N days from mongodb and'
450 ' parse those that are not already in elasticsearch.'
451 ' If not present, will get everything from mongodb, which is the default')
453 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
454 help='the url of elasticsearch, defaults to http://localhost:9200')
456 parser.add_argument('-u', '--elasticsearch-username', default=None,
457 help='The username with password for elasticsearch in format username:password')
459 args = parser.parse_args()
460 base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
461 output_destination = args.output_destination
462 days = args.merge_latest
463 es_creds = args.elasticsearch_username
465 if output_destination == 'elasticsearch':
466 output_destination = base_elastic_url
468 # parsed_test_results will be printed/sent to elasticsearch
470 publish_mongo_data(output_destination)
481 elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
482 logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
483 mongo_data = get_mongo_data(days)
484 publish_difference(mongo_data, elastic_data, output_destination, es_creds)
486 raise Exception('Update must be non-negative')