add logger common process for releng scripts
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
1 #! /usr/bin/env python
2 import datetime
3 import json
4 import os
5 import subprocess
6 import traceback
7 import urlparse
8 import uuid
9
10 import argparse
11
12 import conf_utils
13 import logger_utils
14 import mongo2elastic_format
15 import shared_utils
16
17 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
18
19
20 def _fix_date(date_string):
21     if isinstance(date_string, dict):
22         return date_string['$date']
23     else:
24         return date_string[:-3].replace(' ', 'T') + 'Z'
25
26
27 def verify_mongo_entry(testcase):
28     """
29     Mandatory fields:
30         installer
31         pod_name
32         version
33         case_name
34         date
35         project
36         details
37
38         these fields must be present and must NOT be None
39
40     Optional fields:
41         description
42
43         these fields will be preserved if the are NOT None
44     """
45     mandatory_fields = ['installer',
46                         'pod_name',
47                         'version',
48                         'case_name',
49                         'project_name',
50                         'details']
51     mandatory_fields_to_modify = {'start_date': _fix_date}
52     fields_to_swap_or_add = {'scenario': 'version'}
53     if '_id' in testcase:
54         mongo_id = testcase['_id']
55     else:
56         mongo_id = None
57     optional_fields = ['description']
58     for key, value in testcase.items():
59         if key in mandatory_fields:
60             if value is None:
61                 # empty mandatory field, invalid input
62                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
63                             " for mandatory field '{}'".format(mongo_id, key))
64                 return False
65             else:
66                 mandatory_fields.remove(key)
67         elif key in mandatory_fields_to_modify:
68             if value is None:
69                 # empty mandatory field, invalid input
70                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
71                             " for mandatory field '{}'".format(mongo_id, key))
72                 return False
73             else:
74                 testcase[key] = mandatory_fields_to_modify[key](value)
75                 del mandatory_fields_to_modify[key]
76         elif key in fields_to_swap_or_add:
77             if value is None:
78                 swapped_key = fields_to_swap_or_add[key]
79                 swapped_value = testcase[swapped_key]
80                 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
81                 testcase[key] = swapped_value
82                 del fields_to_swap_or_add[key]
83             else:
84                 del fields_to_swap_or_add[key]
85         elif key in optional_fields:
86             if value is None:
87                 # empty optional field, remove
88                 del testcase[key]
89             optional_fields.remove(key)
90         else:
91             # unknown field
92             del testcase[key]
93
94     if len(mandatory_fields) > 0:
95         # some mandatory fields are missing
96         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
97                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
98         return False
99     elif len(mandatory_fields_to_modify) > 0:
100         # some mandatory fields are missing
101         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
102                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
103         return False
104     else:
105         if len(fields_to_swap_or_add) > 0:
106             for key, swap_key in fields_to_swap_or_add.iteritems():
107                 testcase[key] = testcase[swap_key]
108
109         return True
110
111
112 def modify_mongo_entry(testcase):
113     # 1. verify and identify the testcase
114     # 2. if modification is implemented, then use that
115     # 3. if not, try to use default
116     # 4. if 2 or 3 is successful, return True, otherwise return False
117     if verify_mongo_entry(testcase):
118         project = testcase['project_name']
119         case_name = testcase['case_name']
120         fmt = conf_utils.get_format(project, case_name)
121         if fmt:
122             try:
123                 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
124                 return vars(mongo2elastic_format)[fmt](testcase)
125             except Exception:
126                 logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
127     else:
128         return False
129
130
131 def publish_mongo_data(output_destination):
132     tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
133     try:
134         subprocess.check_call(['mongoexport',
135                                '--db', 'test_results_collection',
136                                '-c', 'results',
137                                '--out', tmp_filename])
138         with open(tmp_filename) as fobj:
139             for mongo_json_line in fobj:
140                 test_result = json.loads(mongo_json_line)
141                 if modify_mongo_entry(test_result):
142                     status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
143                     if status > 300:
144                         project = test_result['project_name']
145                         case_name = test_result['case_name']
146                         logger.info('project {} case {} publish failed, due to [{}]'
147                                     .format(project, case_name, json.loads(data)['error']['reason']))
148     finally:
149         if os.path.exists(tmp_filename):
150             os.remove(tmp_filename)
151
152
153 def get_mongo_data(days):
154     past_time = datetime.datetime.today() - datetime.timedelta(days=days)
155     mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
156                                                 '--query', '{{"start_date":{{$gt:"{}"}}}}'
157                                                .format(past_time)]).splitlines()
158
159     mongo_data = []
160     for mongo_json_line in mongo_json_lines:
161         test_result = json.loads(mongo_json_line)
162         if modify_mongo_entry(test_result):
163             # if the modification could be applied, append the modified result
164             mongo_data.append(test_result)
165     return mongo_data
166
167
168 def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
169     for elastic_entry in elastic_data:
170         if elastic_entry in mongo_data:
171             mongo_data.remove(elastic_entry)
172
173     logger.info('number of parsed test results: {}'.format(len(mongo_data)))
174
175     for parsed_test_result in mongo_data:
176         shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
177
178
179 if __name__ == '__main__':
180     parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
181     parser.add_argument('-od', '--output-destination',
182                         default='elasticsearch',
183                         choices=('elasticsearch', 'stdout'),
184                         help='defaults to elasticsearch')
185
186     parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
187                         help='get entries old at most N days from mongodb and'
188                              ' parse those that are not already in elasticsearch.'
189                              ' If not present, will get everything from mongodb, which is the default')
190
191     parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
192                         help='the url of elasticsearch, defaults to http://localhost:9200')
193
194     parser.add_argument('-u', '--elasticsearch-username', default=None,
195                         help='The username with password for elasticsearch in format username:password')
196
197     args = parser.parse_args()
198     base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
199     output_destination = args.output_destination
200     days = args.merge_latest
201     es_creds = args.elasticsearch_username
202
203     if output_destination == 'elasticsearch':
204         output_destination = base_elastic_url
205
206     # parsed_test_results will be printed/sent to elasticsearch
207     if days == 0:
208         publish_mongo_data(output_destination)
209     elif days > 0:
210         body = '''{{
211     "query" : {{
212         "range" : {{
213             "start_date" : {{
214                 "gte" : "now-{}d"
215             }}
216         }}
217     }}
218 }}'''.format(days)
219         elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
220         logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
221         mongo_data = get_mongo_data(days)
222         publish_difference(mongo_data, elastic_data, output_destination, es_creds)
223     else:
224         raise Exception('Update must be non-negative')
225