6799574f5d0e032bb76cb35ef243b3124a09203e
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
1 #! /usr/bin/env python
2 import datetime
3 import json
4 import logging
5 import os
6 import subprocess
7 import traceback
8 import urlparse
9 import uuid
10
11 import argparse
12
13 import conf_utils
14 import shared_utils
15 import mongo2elastic_format
16
17 logger = logging.getLogger('mongo_to_elasticsearch')
18 logger.setLevel(logging.DEBUG)
19 file_handler = logging.FileHandler('/var/log/{}.log'.format('mongo_to_elasticsearch'))
20 file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
21 logger.addHandler(file_handler)
22
23
24 def _fix_date(date_string):
25     if isinstance(date_string, dict):
26         return date_string['$date']
27     else:
28         return date_string[:-3].replace(' ', 'T') + 'Z'
29
30
31 def verify_mongo_entry(testcase):
32     """
33     Mandatory fields:
34         installer
35         pod_name
36         version
37         case_name
38         date
39         project
40         details
41
42         these fields must be present and must NOT be None
43
44     Optional fields:
45         description
46
47         these fields will be preserved if the are NOT None
48     """
49     mandatory_fields = ['installer',
50                         'pod_name',
51                         'version',
52                         'case_name',
53                         'project_name',
54                         'details']
55     mandatory_fields_to_modify = {'start_date': _fix_date}
56     fields_to_swap_or_add = {'scenario': 'version'}
57     if '_id' in testcase:
58         mongo_id = testcase['_id']
59     else:
60         mongo_id = None
61     optional_fields = ['description']
62     for key, value in testcase.items():
63         if key in mandatory_fields:
64             if value is None:
65                 # empty mandatory field, invalid input
66                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
67                             " for mandatory field '{}'".format(mongo_id, key))
68                 return False
69             else:
70                 mandatory_fields.remove(key)
71         elif key in mandatory_fields_to_modify:
72             if value is None:
73                 # empty mandatory field, invalid input
74                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
75                             " for mandatory field '{}'".format(mongo_id, key))
76                 return False
77             else:
78                 testcase[key] = mandatory_fields_to_modify[key](value)
79                 del mandatory_fields_to_modify[key]
80         elif key in fields_to_swap_or_add:
81             if value is None:
82                 swapped_key = fields_to_swap_or_add[key]
83                 swapped_value = testcase[swapped_key]
84                 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
85                 testcase[key] = swapped_value
86                 del fields_to_swap_or_add[key]
87             else:
88                 del fields_to_swap_or_add[key]
89         elif key in optional_fields:
90             if value is None:
91                 # empty optional field, remove
92                 del testcase[key]
93             optional_fields.remove(key)
94         else:
95             # unknown field
96             del testcase[key]
97
98     if len(mandatory_fields) > 0:
99         # some mandatory fields are missing
100         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
101                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
102         return False
103     elif len(mandatory_fields_to_modify) > 0:
104         # some mandatory fields are missing
105         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
106                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
107         return False
108     else:
109         if len(fields_to_swap_or_add) > 0:
110             for key, swap_key in fields_to_swap_or_add.iteritems():
111                 testcase[key] = testcase[swap_key]
112
113         return True
114
115
116 def modify_mongo_entry(testcase):
117     # 1. verify and identify the testcase
118     # 2. if modification is implemented, then use that
119     # 3. if not, try to use default
120     # 4. if 2 or 3 is successful, return True, otherwise return False
121     if verify_mongo_entry(testcase):
122         project = testcase['project_name']
123         case_name = testcase['case_name']
124         fmt = conf_utils.get_format(project, case_name)
125         if fmt:
126             try:
127                 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
128                 return vars(mongo2elastic_format)[fmt](testcase)
129             except Exception:
130                 logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
131     else:
132         return False
133
134
135 def publish_mongo_data(output_destination):
136     tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
137     try:
138         subprocess.check_call(['mongoexport',
139                                '--db', 'test_results_collection',
140                                '-c', 'results',
141                                '--out', tmp_filename])
142         with open(tmp_filename) as fobj:
143             for mongo_json_line in fobj:
144                 test_result = json.loads(mongo_json_line)
145                 if modify_mongo_entry(test_result):
146                     status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
147                     if status > 300:
148                         project = test_result['project_name']
149                         case_name = test_result['case_name']
150                         logger.info('project {} case {} publish failed, due to [{}]'
151                                     .format(project, case_name, json.loads(data)['error']['reason']))
152     finally:
153         if os.path.exists(tmp_filename):
154             os.remove(tmp_filename)
155
156
157 def get_mongo_data(days):
158     past_time = datetime.datetime.today() - datetime.timedelta(days=days)
159     mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
160                                                 '--query', '{{"start_date":{{$gt:"{}"}}}}'
161                                                .format(past_time)]).splitlines()
162
163     mongo_data = []
164     for mongo_json_line in mongo_json_lines:
165         test_result = json.loads(mongo_json_line)
166         if modify_mongo_entry(test_result):
167             # if the modification could be applied, append the modified result
168             mongo_data.append(test_result)
169     return mongo_data
170
171
172 def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
173     for elastic_entry in elastic_data:
174         if elastic_entry in mongo_data:
175             mongo_data.remove(elastic_entry)
176
177     logger.info('number of parsed test results: {}'.format(len(mongo_data)))
178
179     for parsed_test_result in mongo_data:
180         shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
181
182
183 if __name__ == '__main__':
184     parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
185     parser.add_argument('-od', '--output-destination',
186                         default='elasticsearch',
187                         choices=('elasticsearch', 'stdout'),
188                         help='defaults to elasticsearch')
189
190     parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
191                         help='get entries old at most N days from mongodb and'
192                              ' parse those that are not already in elasticsearch.'
193                              ' If not present, will get everything from mongodb, which is the default')
194
195     parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
196                         help='the url of elasticsearch, defaults to http://localhost:9200')
197
198     parser.add_argument('-u', '--elasticsearch-username', default=None,
199                         help='The username with password for elasticsearch in format username:password')
200
201     args = parser.parse_args()
202     base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
203     output_destination = args.output_destination
204     days = args.merge_latest
205     es_creds = args.elasticsearch_username
206
207     if output_destination == 'elasticsearch':
208         output_destination = base_elastic_url
209
210     # parsed_test_results will be printed/sent to elasticsearch
211     if days == 0:
212         publish_mongo_data(output_destination)
213     elif days > 0:
214         body = '''{{
215     "query" : {{
216         "range" : {{
217             "start_date" : {{
218                 "gte" : "now-{}d"
219             }}
220         }}
221     }}
222 }}'''.format(days)
223         elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
224         logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
225         mongo_data = get_mongo_data(days)
226         publish_difference(mongo_data, elastic_data, output_destination, es_creds)
227     else:
228         raise Exception('Update must be non-negative')
229