3af7c0fa81227a43ce8bba1ea4b806569833b9e7
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
1 #! /usr/bin/env python
2
3 import datetime
4 import json
5 import os
6 import subprocess
7 import traceback
8 import urlparse
9 import uuid
10
11 import argparse
12
13 import conf_utils
14 import logger_utils
15 import mongo2elastic_format
16 import shared_utils
17
18 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
19
20 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
21 parser.add_argument('-od', '--output-destination',
22                     default='elasticsearch',
23                     choices=('elasticsearch', 'stdout'),
24                     help='defaults to elasticsearch')
25
26 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
27                     help='get entries old at most N days from mongodb and'
28                          ' parse those that are not already in elasticsearch.'
29                          ' If not present, will get everything from mongodb, which is the default')
30
31 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
32                     help='the url of elasticsearch, defaults to http://localhost:9200')
33
34 parser.add_argument('-u', '--elasticsearch-username', default=None,
35                     help='The username with password for elasticsearch in format username:password')
36
37 args = parser.parse_args()
38
39 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
40
41
42 class DocumentPublisher:
43
44     def __init__(self, doc, fmt, exist_docs, creds, to):
45         self.doc = doc
46         self.fmt = fmt
47         self.creds = creds
48         self.exist_docs = exist_docs
49         self.to = to
50         self.is_formatted = True
51
52     def format(self):
53         try:
54             if self._verify_document() and self.fmt:
55                 self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
56             else:
57                 self.is_formatted = False
58         except Exception:
59             logger.error("Fail in format testcase[%s]\nerror message: %s" %
60                          (self.doc, traceback.format_exc()))
61             self.is_formatted = False
62         finally:
63             return self
64
65     def publish(self):
66         if self.is_formatted and self.doc not in self.exist_docs:
67             self._publish()
68
69     def _publish(self):
70         status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
71         if status > 300:
72             logger.error('Publish record[{}] failed, due to [{}]'
73                          .format(self.doc, json.loads(data)['error']['reason']))
74
75     def _fix_date(self, date_string):
76         if isinstance(date_string, dict):
77             return date_string['$date']
78         else:
79             return date_string[:-3].replace(' ', 'T') + 'Z'
80
81     def _verify_document(self):
82         """
83         Mandatory fields:
84             installer
85             pod_name
86             version
87             case_name
88             date
89             project
90             details
91
92             these fields must be present and must NOT be None
93
94         Optional fields:
95             description
96
97             these fields will be preserved if the are NOT None
98         """
99         mandatory_fields = ['installer',
100                             'pod_name',
101                             'version',
102                             'case_name',
103                             'project_name',
104                             'details']
105         mandatory_fields_to_modify = {'start_date': self._fix_date}
106         fields_to_swap_or_add = {'scenario': 'version'}
107         if '_id' in self.doc:
108             mongo_id = self.doc['_id']
109         else:
110             mongo_id = None
111         optional_fields = ['description']
112         for key, value in self.doc.items():
113             if key in mandatory_fields:
114                 if value is None:
115                     # empty mandatory field, invalid input
116                     logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
117                                 " for mandatory field '{}'".format(mongo_id, key))
118                     return False
119                 else:
120                     mandatory_fields.remove(key)
121             elif key in mandatory_fields_to_modify:
122                 if value is None:
123                     # empty mandatory field, invalid input
124                     logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
125                                 " for mandatory field '{}'".format(mongo_id, key))
126                     return False
127                 else:
128                     self.doc[key] = mandatory_fields_to_modify[key](value)
129                     del mandatory_fields_to_modify[key]
130             elif key in fields_to_swap_or_add:
131                 if value is None:
132                     swapped_key = fields_to_swap_or_add[key]
133                     swapped_value = self.doc[swapped_key]
134                     logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
135                                                                                                        swapped_value))
136                     self.doc[key] = swapped_value
137                     del fields_to_swap_or_add[key]
138                 else:
139                     del fields_to_swap_or_add[key]
140             elif key in optional_fields:
141                 if value is None:
142                     # empty optional field, remove
143                     del self.doc[key]
144                 optional_fields.remove(key)
145             else:
146                 # unknown field
147                 del self.doc[key]
148
149         if len(mandatory_fields) > 0:
150             # some mandatory fields are missing
151             logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
152                         " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
153             return False
154         elif len(mandatory_fields_to_modify) > 0:
155             # some mandatory fields are missing
156             logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
157                         " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
158             return False
159         else:
160             if len(fields_to_swap_or_add) > 0:
161                 for key, swap_key in fields_to_swap_or_add.iteritems():
162                     self.doc[key] = self.doc[swap_key]
163
164             return True
165
166
167 class DocumentsPublisher:
168
169     def __init__(self, project, case, fmt, days, elastic_url, creds, to):
170         self.project = project
171         self.case = case
172         self.fmt = fmt
173         self.days = days
174         self.elastic_url = elastic_url
175         self.creds = creds
176         self.to = to
177         self.existed_docs = []
178
179     def export(self):
180         if days > 0:
181             past_time = datetime.datetime.today() - datetime.timedelta(days=days)
182             query = '''{{
183                           "project_name": "{}",
184                           "case_name": "{}",
185                           "start_date": {{"$gt" : "{}"}}
186                         }}'''.format(self.project, self.case, past_time)
187         else:
188             query = '''{{
189                            "project_name": "{}",
190                            "case_name": "{}"
191                         }}'''.format(self.project, self.case)
192         cmd = ['mongoexport',
193                '--db', 'test_results_collection',
194                '--collection', 'results',
195                '--query', '{}'.format(query),
196                '--out', '{}'.format(tmp_docs_file)]
197         try:
198             subprocess.check_call(cmd)
199             return self
200         except Exception, err:
201             logger.error("export mongodb failed: %s" % err)
202             self._remove()
203             exit(-1)
204
205     def get_existed_docs(self):
206         self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, days)
207         return self
208
209     def publish(self):
210         try:
211             with open(tmp_docs_file) as fdocs:
212                 for doc_line in fdocs:
213                     DocumentPublisher(json.loads(doc_line),
214                                       self.fmt,
215                                       self.existed_docs,
216                                       self.creds,
217                                       self.to).format().publish()
218         finally:
219             fdocs.close()
220             self._remove()
221
222     def _remove(self):
223         if os.path.exists(tmp_docs_file):
224             os.remove(tmp_docs_file)
225
226
227 if __name__ == '__main__':
228     base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
229     to = args.output_destination
230     days = args.merge_latest
231     es_creds = args.elasticsearch_username
232
233     if to == 'elasticsearch':
234         to = base_elastic_url
235
236     for project, case_dicts in conf_utils.testcases_yaml.items():
237         for case_dict in case_dicts:
238             case = case_dict.get('name')
239             fmt = conf_utils.compose_format(case_dict.get('format'))
240             DocumentsPublisher(project,
241                                case,
242                                fmt,
243                                days,
244                                base_elastic_url,
245                                es_creds,
246                                to).export().get_existed_docs().publish()