use configure file rather than arguments to organize the configuration
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
1 #! /usr/bin/env python
2
3 import datetime
4 import json
5 import os
6 import subprocess
7 import traceback
8 import urlparse
9 import uuid
10
11 import argparse
12
13 import logger_utils
14 import mongo2elastic_format
15 import shared_utils
16 import testcases_parser
17 from config import APIConfig
18
19 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
20
21 parser = argparse.ArgumentParser()
22 parser.add_argument("-c", "--config-file",
23                     dest='config_file',
24                     help="Config file location")
25 parser.add_argument('-ld', '--latest-days',
26                     default=0,
27                     type=int,
28                     metavar='N',
29                     help='get entries old at most N days from mongodb and'
30                          ' parse those that are not already in elasticsearch.'
31                          ' If not present, will get everything from mongodb, which is the default')
32
33 args = parser.parse_args()
34 CONF = APIConfig().parse(args.config_file)
35
36
37 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
38
39
40 class DocumentPublisher:
41
42     def __init__(self, doc, fmt, exist_docs, creds, to):
43         self.doc = doc
44         self.fmt = fmt
45         self.creds = creds
46         self.exist_docs = exist_docs
47         self.to = to
48         self.is_formatted = True
49
50     def format(self):
51         try:
52             if self._verify_document() and self.fmt:
53                 self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
54             else:
55                 self.is_formatted = False
56         except Exception:
57             logger.error("Fail in format testcase[%s]\nerror message: %s" %
58                          (self.doc, traceback.format_exc()))
59             self.is_formatted = False
60         finally:
61             return self
62
63     def publish(self):
64         if self.is_formatted and self.doc not in self.exist_docs:
65             self._publish()
66
67     def _publish(self):
68         status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
69         if status > 300:
70             logger.error('Publish record[{}] failed, due to [{}]'
71                          .format(self.doc, json.loads(data)['error']['reason']))
72
73     def _fix_date(self, date_string):
74         if isinstance(date_string, dict):
75             return date_string['$date']
76         else:
77             return date_string[:-3].replace(' ', 'T') + 'Z'
78
79     def _verify_document(self):
80         """
81         Mandatory fields:
82             installer
83             pod_name
84             version
85             case_name
86             date
87             project
88             details
89
90             these fields must be present and must NOT be None
91
92         Optional fields:
93             description
94
95             these fields will be preserved if the are NOT None
96         """
97         mandatory_fields = ['installer',
98                             'pod_name',
99                             'version',
100                             'case_name',
101                             'project_name',
102                             'details']
103         mandatory_fields_to_modify = {'start_date': self._fix_date}
104         fields_to_swap_or_add = {'scenario': 'version'}
105         if '_id' in self.doc:
106             mongo_id = self.doc['_id']
107         else:
108             mongo_id = None
109         optional_fields = ['description']
110         for key, value in self.doc.items():
111             if key in mandatory_fields:
112                 if value is None:
113                     # empty mandatory field, invalid input
114                     logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
115                                 " for mandatory field '{}'".format(mongo_id, key))
116                     return False
117                 else:
118                     mandatory_fields.remove(key)
119             elif key in mandatory_fields_to_modify:
120                 if value is None:
121                     # empty mandatory field, invalid input
122                     logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
123                                 " for mandatory field '{}'".format(mongo_id, key))
124                     return False
125                 else:
126                     self.doc[key] = mandatory_fields_to_modify[key](value)
127                     del mandatory_fields_to_modify[key]
128             elif key in fields_to_swap_or_add:
129                 if value is None:
130                     swapped_key = fields_to_swap_or_add[key]
131                     swapped_value = self.doc[swapped_key]
132                     logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
133                                                                                                        swapped_value))
134                     self.doc[key] = swapped_value
135                     del fields_to_swap_or_add[key]
136                 else:
137                     del fields_to_swap_or_add[key]
138             elif key in optional_fields:
139                 if value is None:
140                     # empty optional field, remove
141                     del self.doc[key]
142                 optional_fields.remove(key)
143             else:
144                 # unknown field
145                 del self.doc[key]
146
147         if len(mandatory_fields) > 0:
148             # some mandatory fields are missing
149             logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
150                         " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
151             return False
152         elif len(mandatory_fields_to_modify) > 0:
153             # some mandatory fields are missing
154             logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
155                         " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
156             return False
157         else:
158             if len(fields_to_swap_or_add) > 0:
159                 for key, swap_key in fields_to_swap_or_add.iteritems():
160                     self.doc[key] = self.doc[swap_key]
161
162             return True
163
164
165 class DocumentsPublisher:
166
167     def __init__(self, project, case, fmt, days, elastic_url, creds, to):
168         self.project = project
169         self.case = case
170         self.fmt = fmt
171         self.days = days
172         self.elastic_url = elastic_url
173         self.creds = creds
174         self.to = to
175         self.existed_docs = []
176
177     def export(self):
178         if self.days > 0:
179             past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
180             query = '''{{
181                           "project_name": "{}",
182                           "case_name": "{}",
183                           "start_date": {{"$gt" : "{}"}}
184                         }}'''.format(self.project, self.case, past_time)
185         else:
186             query = '''{{
187                            "project_name": "{}",
188                            "case_name": "{}"
189                         }}'''.format(self.project, self.case)
190         cmd = ['mongoexport',
191                '--db', 'test_results_collection',
192                '--collection', 'results',
193                '--query', '{}'.format(query),
194                '--out', '{}'.format(tmp_docs_file)]
195         try:
196             subprocess.check_call(cmd)
197             return self
198         except Exception, err:
199             logger.error("export mongodb failed: %s" % err)
200             self._remove()
201             exit(-1)
202
203     def get_existed_docs(self):
204         self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, self.days)
205         return self
206
207     def publish(self):
208         try:
209             with open(tmp_docs_file) as fdocs:
210                 for doc_line in fdocs:
211                     DocumentPublisher(json.loads(doc_line),
212                                       self.fmt,
213                                       self.existed_docs,
214                                       self.creds,
215                                       self.to).format().publish()
216         finally:
217             fdocs.close()
218             self._remove()
219
220     def _remove(self):
221         if os.path.exists(tmp_docs_file):
222             os.remove(tmp_docs_file)
223
224
225 def main():
226     base_elastic_url = urlparse.urljoin(CONF.elastic_url, '/test_results/mongo2elastic')
227     to = CONF.destination
228     days = args.latest_days
229     es_creds = CONF.elastic_creds
230
231     if to == 'elasticsearch':
232         to = base_elastic_url
233
234     for project, case_dicts in testcases_parser.testcases_yaml.items():
235         for case_dict in case_dicts:
236             case = case_dict.get('name')
237             fmt = testcases_parser.compose_format(case_dict.get('format'))
238             DocumentsPublisher(project,
239                                case,
240                                fmt,
241                                days,
242                                base_elastic_url,
243                                es_creds,
244                                to).export().get_existed_docs().publish()
245
246
247 if __name__ == '__main__':
248     main()