12 logger = logging.getLogger('mongo_to_elasticsearch')
13 logger.setLevel(logging.DEBUG)
14 file_handler = logging.FileHandler('/var/log/{}.log'.format(__name__))
15 file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
16 logger.addHandler(file_handler)
19 def _get_dicts_from_list(dict_list, keys):
21 for dictionary in dict_list:
22 # iterate over dictionaries in input list
23 if keys == set(dictionary.keys()):
24 # check the dictionary structure
25 dicts.append(dictionary)
29 def _get_results_from_list_of_dicts(list_of_dict_statuses, dict_indexes, expected_results=None):
31 for test_status in list_of_dict_statuses:
33 for index in dict_indexes:
34 status = status[index]
35 if status in test_results:
36 test_results[status] += 1
38 test_results[status] = 1
40 if expected_results is not None:
41 for expected_result in expected_results:
42 if expected_result not in test_results:
43 test_results[expected_result] = 0
48 def _convert_value(value):
49 return value if value != '' else 0
52 def _convert_duration(duration):
53 if (isinstance(duration, str) or isinstance(duration, unicode)) and ':' in duration:
54 hours, minutes, seconds = duration.split(":")
55 hours = _convert_value(hours)
56 minutes = _convert_value(minutes)
57 seconds = _convert_value(seconds)
58 int_duration = 3600 * int(hours) + 60 * int(minutes) + float(seconds)
60 int_duration = duration
64 def modify_functest_tempest(testcase):
65 if modify_default_entry(testcase):
66 testcase_details = testcase['details']
67 testcase_tests = float(testcase_details['tests'])
68 testcase_failures = float(testcase_details['failures'])
69 if testcase_tests != 0:
70 testcase_details['success_percentage'] = 100 * (testcase_tests - testcase_failures) / testcase_tests
72 testcase_details['success_percentage'] = 0
78 def modify_functest_vims(testcase):
81 details.sig_test.result.[{result}]
82 details.sig_test.duration
84 details.orchestrator.duration
86 Find data for these fields
87 -> details.sig_test.duration
88 -> details.sig_test.tests
89 -> details.sig_test.failures
90 -> details.sig_test.passed
91 -> details.sig_test.skipped
92 -> details.vIMS.duration
93 -> details.orchestrator.duration
95 testcase_details = testcase['details']
96 sig_test_results = _get_dicts_from_list(testcase_details['sig_test']['result'],
97 {'duration', 'result', 'name', 'error'})
98 if len(sig_test_results) < 1:
99 logger.info("No 'result' from 'sig_test' found in vIMS details, skipping")
102 test_results = _get_results_from_list_of_dicts(sig_test_results, ('result',), ('Passed', 'Skipped', 'Failed'))
103 passed = test_results['Passed']
104 skipped = test_results['Skipped']
105 failures = test_results['Failed']
106 all_tests = passed + skipped + failures
107 testcase['details'] = {
109 'duration': testcase_details['sig_test']['duration'],
111 'failures': failures,
116 'duration': testcase_details['vIMS']['duration']
119 'duration': testcase_details['orchestrator']['duration']
125 def modify_functest_onos(testcase):
128 details.FUNCvirNet.duration
129 details.FUNCvirNet.status.[{Case result}]
130 details.FUNCvirNetL3.duration
131 details.FUNCvirNetL3.status.[{Case result}]
133 Find data for these fields
134 -> details.FUNCvirNet.duration
135 -> details.FUNCvirNet.tests
136 -> details.FUNCvirNet.failures
137 -> details.FUNCvirNetL3.duration
138 -> details.FUNCvirNetL3.tests
139 -> details.FUNCvirNetL3.failures
141 testcase_details = testcase['details']
143 funcvirnet_details = testcase_details['FUNCvirNet']['status']
144 funcvirnet_statuses = _get_dicts_from_list(funcvirnet_details, {'Case result', 'Case name:'})
146 funcvirnetl3_details = testcase_details['FUNCvirNetL3']['status']
147 funcvirnetl3_statuses = _get_dicts_from_list(funcvirnetl3_details, {'Case result', 'Case name:'})
149 if len(funcvirnet_statuses) < 0:
150 logger.info("No results found in 'FUNCvirNet' part of ONOS results")
152 elif len(funcvirnetl3_statuses) < 0:
153 logger.info("No results found in 'FUNCvirNetL3' part of ONOS results")
156 funcvirnet_results = _get_results_from_list_of_dicts(funcvirnet_statuses,
157 ('Case result',), ('PASS', 'FAIL'))
158 funcvirnetl3_results = _get_results_from_list_of_dicts(funcvirnetl3_statuses,
159 ('Case result',), ('PASS', 'FAIL'))
161 funcvirnet_passed = funcvirnet_results['PASS']
162 funcvirnet_failed = funcvirnet_results['FAIL']
163 funcvirnet_all = funcvirnet_passed + funcvirnet_failed
165 funcvirnetl3_passed = funcvirnetl3_results['PASS']
166 funcvirnetl3_failed = funcvirnetl3_results['FAIL']
167 funcvirnetl3_all = funcvirnetl3_passed + funcvirnetl3_failed
169 testcase_details['FUNCvirNet'] = {
170 'duration': _convert_duration(testcase_details['FUNCvirNet']['duration']),
171 'tests': funcvirnet_all,
172 'failures': funcvirnet_failed
175 testcase_details['FUNCvirNetL3'] = {
176 'duration': _convert_duration(testcase_details['FUNCvirNetL3']['duration']),
177 'tests': funcvirnetl3_all,
178 'failures': funcvirnetl3_failed
184 def modify_functest_rally(testcase):
187 details.[{summary.duration}]
188 details.[{summary.nb success}]
189 details.[{summary.nb tests}]
191 Find data for these fields
194 -> details.success_percentage
196 summaries = _get_dicts_from_list(testcase['details'], {'summary'})
198 if len(summaries) != 1:
199 logger.info("Found zero or more than one 'summaries' in Rally details, skipping")
202 summary = summaries[0]['summary']
203 testcase['details'] = {
204 'duration': summary['duration'],
205 'tests': summary['nb tests'],
206 'success_percentage': summary['nb success']
211 def modify_functest_odl(testcase):
214 details.details.[{test_status.@status}]
216 Find data for these fields
219 -> details.success_percentage?
221 test_statuses = _get_dicts_from_list(testcase['details']['details'], {'test_status', 'test_doc', 'test_name'})
222 if len(test_statuses) < 1:
223 logger.info("No 'test_status' found in ODL details, skipping")
226 test_results = _get_results_from_list_of_dicts(test_statuses, ('test_status', '@status'), ('PASS', 'FAIL'))
228 passed_tests = test_results['PASS']
229 failed_tests = test_results['FAIL']
230 all_tests = passed_tests + failed_tests
232 testcase['details'] = {
234 'failures': failed_tests,
235 'success_percentage': 100 * passed_tests / float(all_tests)
240 def modify_default_entry(testcase):
242 Look for these and leave any of those:
247 If none are present, then return False
250 testcase_details = testcase['details']
251 fields = ['duration', 'tests', 'failures']
252 if isinstance(testcase_details, dict):
253 for key, value in testcase_details.items():
256 if key == 'duration':
257 testcase_details[key] = _convert_duration(value)
259 del testcase_details[key]
264 def _fix_date(date_string):
265 if isinstance(date_string, dict):
266 return date_string['$date']
268 return date_string[:-3].replace(' ', 'T') + 'Z'
271 def verify_mongo_entry(testcase):
282 these fields must be present and must NOT be None
287 these fields will be preserved if the are NOT None
289 mandatory_fields = ['installer',
295 mandatory_fields_to_modify = {'creation_date': _fix_date}
296 if '_id' in testcase:
297 mongo_id = testcase['_id']
300 optional_fields = ['description']
301 for key, value in testcase.items():
302 if key in mandatory_fields:
304 # empty mandatory field, invalid input
305 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
306 " for mandatory field '{}'".format(mongo_id, key))
309 mandatory_fields.remove(key)
310 elif key in mandatory_fields_to_modify:
312 # empty mandatory field, invalid input
313 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
314 " for mandatory field '{}'".format(mongo_id, key))
317 testcase[key] = mandatory_fields_to_modify[key](value)
318 del mandatory_fields_to_modify[key]
319 elif key in optional_fields:
321 # empty optional field, remove
323 optional_fields.remove(key)
328 if len(mandatory_fields) > 0:
329 # some mandatory fields are missing
330 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
331 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
337 def modify_mongo_entry(testcase):
338 # 1. verify and identify the testcase
339 # 2. if modification is implemented, then use that
340 # 3. if not, try to use default
341 # 4. if 2 or 3 is successful, return True, otherwise return False
342 if verify_mongo_entry(testcase):
343 project = testcase['project_name']
344 case_name = testcase['case_name']
345 if project == 'functest':
346 if case_name == 'Rally':
347 return modify_functest_rally(testcase)
348 elif case_name == 'ODL':
349 return modify_functest_odl(testcase)
350 elif case_name == 'ONOS':
351 return modify_functest_onos(testcase)
352 elif case_name == 'vIMS':
353 return modify_functest_vims(testcase)
354 elif case_name == 'Tempest':
355 return modify_functest_tempest(testcase)
356 return modify_default_entry(testcase)
361 def publish_mongo_data(output_destination):
362 tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
364 subprocess.check_call(['mongoexport', '--db', 'test_results_collection', '-c', 'test_results', '--out',
366 with open(tmp_filename) as fobj:
367 for mongo_json_line in fobj:
368 test_result = json.loads(mongo_json_line)
369 if modify_mongo_entry(test_result):
370 shared_utils.publish_json(test_result, output_destination, es_user, es_passwd)
372 if os.path.exists(tmp_filename):
373 os.remove(tmp_filename)
376 def get_mongo_data(days):
377 past_time = datetime.datetime.today() - datetime.timedelta(days=days)
378 mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'test_results',
379 '--query', '{{"creation_date":{{$gt:"{}"}}}}'
380 .format(past_time)]).splitlines()
383 for mongo_json_line in mongo_json_lines:
384 test_result = json.loads(mongo_json_line)
385 if modify_mongo_entry(test_result):
386 # if the modification could be applied, append the modified result
387 mongo_data.append(test_result)
391 def publish_difference(mongo_data, elastic_data, output_destination, es_user, es_passwd):
392 for elastic_entry in elastic_data:
393 if elastic_entry in mongo_data:
394 mongo_data.remove(elastic_entry)
396 logger.info('number of parsed test results: {}'.format(len(mongo_data)))
398 for parsed_test_result in mongo_data:
399 shared_utils.publish_json(parsed_test_result, es_user, es_passwd, output_destination)
402 if __name__ == '__main__':
403 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
404 parser.add_argument('-od', '--output-destination',
405 default='elasticsearch',
406 choices=('elasticsearch', 'stdout'),
407 help='defaults to elasticsearch')
409 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
410 help='get entries old at most N days from mongodb and'
411 ' parse those that are not already in elasticsearch.'
412 ' If not present, will get everything from mongodb, which is the default')
414 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
415 help='the url of elasticsearch, defaults to http://localhost:9200')
417 parser.add_argument('-u', '--elasticsearch-username',
418 help='the username for elasticsearch')
420 parser.add_argument('-p', '--elasticsearch-password',
421 help='the password for elasticsearch')
423 parser.add_argument('-m', '--mongodb-url', default='http://localhost:8082',
424 help='the url of mongodb, defaults to http://localhost:8082')
426 args = parser.parse_args()
427 base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
428 output_destination = args.output_destination
429 days = args.merge_latest
430 es_user = args.elasticsearch_username
431 es_passwd = args.elasticsearch_password
433 if output_destination == 'elasticsearch':
434 output_destination = base_elastic_url
436 # parsed_test_results will be printed/sent to elasticsearch
438 # TODO get everything from mongo
439 publish_mongo_data(output_destination)
450 elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_user, es_passwd, body)
451 logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
452 mongo_data = get_mongo_data(days)
453 publish_difference(mongo_data, elastic_data, output_destination, es_user, es_passwd)
455 raise Exception('Update must be non-negative')