Merge "refactor dashboard_construct"
[releng.git] / utils / test / dashboard / dashboard / mongo2elastic / main.py
1 #! /usr/bin/env python
2
3 import datetime
4 import json
5 import os
6 import subprocess
7 import traceback
8 import urlparse
9 import uuid
10
11 import argparse
12
13 from common import logger_utils, elastic_access
14 from conf import testcases
15 from conf.config import APIConfig
16 from mongo2elastic import format
17
18 logger = logger_utils.DashboardLogger('mongo2elastic').get
19
20 parser = argparse.ArgumentParser()
21 parser.add_argument("-c", "--config-file",
22                     dest='config_file',
23                     help="Config file location")
24 parser.add_argument('-ld', '--latest-days',
25                     default=0,
26                     type=int,
27                     metavar='N',
28                     help='get entries old at most N days from mongodb and'
29                          ' parse those that are not already in elasticsearch.'
30                          ' If not present, will get everything from mongodb, which is the default')
31
32 args = parser.parse_args()
33 CONF = APIConfig().parse(args.config_file)
34
35
36 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
37
38
39 class DocumentVerification(object):
40     def __init__(self, doc):
41         super(DocumentVerification, self).__init__()
42         self.doc = doc
43         self.doc_id = doc['_id'] if '_id' in doc else None
44         self.skip = False
45
46     def mandatory_fields_exist(self):
47         mandatory_fields = ['installer',
48                             'pod_name',
49                             'version',
50                             'case_name',
51                             'project_name',
52                             'details',
53                             'start_date',
54                             'scenario']
55         for key, value in self.doc.items():
56             if key in mandatory_fields:
57                 if value is None:
58                     logger.info("Skip testcase '%s' because field '%s' missing" %
59                                 (self.doc_id, key))
60                     self.skip = True
61                 else:
62                     mandatory_fields.remove(key)
63             else:
64                 del self.doc[key]
65
66         if len(mandatory_fields) > 0:
67             logger.info("Skip testcase '%s' because field(s) '%s' missing" %
68                         (self.doc_id, mandatory_fields))
69             self.skip = True
70
71         return self
72
73     def modify_start_date(self):
74         field = 'start_date'
75         if field in self.doc:
76             self.doc[field] = self._fix_date(self.doc[field])
77
78         return self
79
80     def modify_scenario(self):
81         scenario = 'scenario'
82         version = 'version'
83
84         if (scenario not in self.doc) or \
85                 (scenario in self.doc and self.doc[scenario] is None):
86             self.doc[scenario] = self.doc[version]
87
88         return self
89
90     def is_skip(self):
91         return self.skip
92
93     def _fix_date(self, date_string):
94         if isinstance(date_string, dict):
95             return date_string['$date']
96         else:
97             return date_string[:-3].replace(' ', 'T') + 'Z'
98
99
100 class DocumentPublisher(object):
101
102     def __init__(self, doc, fmt, exist_docs, creds, elastic_url):
103         self.doc = doc
104         self.fmt = fmt
105         self.creds = creds
106         self.exist_docs = exist_docs
107         self.elastic_url = elastic_url
108         self.is_formatted = True
109
110     def format(self):
111         try:
112             if self._verify_document() and self.fmt:
113                 self.is_formatted = vars(format)[self.fmt](self.doc)
114             else:
115                 self.is_formatted = False
116         except Exception:
117             logger.error("Fail in format testcase[%s]\nerror message: %s" %
118                          (self.doc, traceback.format_exc()))
119             self.is_formatted = False
120         finally:
121             return self
122
123     def publish(self):
124         if self.is_formatted and self.doc not in self.exist_docs:
125             self._publish()
126
127     def _publish(self):
128         status, data = elastic_access.publish_docs(self.elastic_url, self.creds, self.doc)
129         if status > 300:
130             logger.error('Publish record[{}] failed, due to [{}]'
131                          .format(self.doc, json.loads(data)['error']['reason']))
132
133     def _fix_date(self, date_string):
134         if isinstance(date_string, dict):
135             return date_string['$date']
136         else:
137             return date_string[:-3].replace(' ', 'T') + 'Z'
138
139     def _verify_document(self):
140         return not (DocumentVerification(self.doc)
141                     .modify_start_date()
142                     .modify_scenario()
143                     .mandatory_fields_exist()
144                     .is_skip())
145
146
147 class DocumentsPublisher(object):
148
149     def __init__(self, project, case, fmt, days, elastic_url, creds):
150         self.project = project
151         self.case = case
152         self.fmt = fmt
153         self.days = days
154         self.elastic_url = elastic_url
155         self.creds = creds
156         self.existed_docs = []
157
158     def export(self):
159         if self.days > 0:
160             past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
161             query = '''{{
162                           "project_name": "{}",
163                           "case_name": "{}",
164                           "start_date": {{"$gt" : "{}"}}
165                         }}'''.format(self.project, self.case, past_time)
166         else:
167             query = '''{{
168                            "project_name": "{}",
169                            "case_name": "{}"
170                         }}'''.format(self.project, self.case)
171         cmd = ['mongoexport',
172                '--db', 'test_results_collection',
173                '--collection', 'results',
174                '--query', '{}'.format(query),
175                '--out', '{}'.format(tmp_docs_file)]
176         try:
177             subprocess.check_call(cmd)
178             return self
179         except Exception, err:
180             logger.error("export mongodb failed: %s" % err)
181             self._remove()
182             exit(-1)
183
184     def get_existed_docs(self):
185         if self.days == 0:
186             body = '''{{
187                         "query": {{
188                             "bool": {{
189                                 "must": [
190                                     {{ "match": {{ "project_name": "{}" }} }},
191                                     {{ "match": {{ "case_name": "{}" }} }}
192                                 ]
193                             }}
194                         }}
195                     }}'''.format(self.project, self.case)
196         elif self.days > 0:
197             body = '''{{
198                        "query": {{
199                            "bool": {{
200                                "must": [
201                                    {{ "match": {{ "project_name": "{}" }} }},
202                                    {{ "match": {{ "case_name": "{}" }} }}
203                                ],
204                                "filter": {{
205                                    "range": {{
206                                        "start_date": {{ "gte": "now-{}d" }}
207                                    }}
208                                }}
209                            }}
210                        }}
211                    }}'''.format(self.project, self.case, self.days)
212         else:
213             raise Exception('Update days must be non-negative')
214         self.existed_docs = elastic_access.get_docs(self.elastic_url, self.creds, body)
215         return self
216
217     def publish(self):
218         fdocs = None
219         try:
220             with open(tmp_docs_file) as fdocs:
221                 for doc_line in fdocs:
222                     DocumentPublisher(json.loads(doc_line),
223                                       self.fmt,
224                                       self.existed_docs,
225                                       self.creds,
226                                       self.elastic_url).format().publish()
227         finally:
228             if fdocs:
229                 fdocs.close()
230             self._remove()
231
232     def _remove(self):
233         if os.path.exists(tmp_docs_file):
234             os.remove(tmp_docs_file)
235
236
237 def main():
238     base_elastic_url = urlparse.urljoin(CONF.es_url, '/test_results/mongo2elastic')
239     days = args.latest_days
240     es_creds = CONF.es_creds
241
242     for project, case_dicts in testcases.testcases_yaml.items():
243         for case_dict in case_dicts:
244             case = case_dict.get('name')
245             fmt = testcases.compose_format(case_dict.get('format'))
246             DocumentsPublisher(project,
247                                case,
248                                fmt,
249                                days,
250                                base_elastic_url,
251                                es_creds).export().get_existed_docs().publish()