b722793b3b4439f370f7211b4214df9cf2441403
[releng.git] / utils / test / scripts / mongo_to_elasticsearch.py
1 #! /usr/bin/env python
2 import datetime
3 import json
4 import os
5 import subprocess
6 import traceback
7 import urlparse
8 import uuid
9
10 import argparse
11
12 import conf_utils
13 import logger_utils
14 import mongo2elastic_format
15 import shared_utils
16
17 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
18
19 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
20 parser.add_argument('-od', '--output-destination',
21                     default='elasticsearch',
22                     choices=('elasticsearch', 'stdout'),
23                     help='defaults to elasticsearch')
24
25 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
26                     help='get entries old at most N days from mongodb and'
27                          ' parse those that are not already in elasticsearch.'
28                          ' If not present, will get everything from mongodb, which is the default')
29
30 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
31                     help='the url of elasticsearch, defaults to http://localhost:9200')
32
33 parser.add_argument('-u', '--elasticsearch-username', default=None,
34                     help='The username with password for elasticsearch in format username:password')
35
36 args = parser.parse_args()
37
38 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
39
40
41 def _fix_date(date_string):
42     if isinstance(date_string, dict):
43         return date_string['$date']
44     else:
45         return date_string[:-3].replace(' ', 'T') + 'Z'
46
47
48 def verify_document(testcase):
49     """
50     Mandatory fields:
51         installer
52         pod_name
53         version
54         case_name
55         date
56         project
57         details
58
59         these fields must be present and must NOT be None
60
61     Optional fields:
62         description
63
64         these fields will be preserved if the are NOT None
65     """
66     mandatory_fields = ['installer',
67                         'pod_name',
68                         'version',
69                         'case_name',
70                         'project_name',
71                         'details']
72     mandatory_fields_to_modify = {'start_date': _fix_date}
73     fields_to_swap_or_add = {'scenario': 'version'}
74     if '_id' in testcase:
75         mongo_id = testcase['_id']
76     else:
77         mongo_id = None
78     optional_fields = ['description']
79     for key, value in testcase.items():
80         if key in mandatory_fields:
81             if value is None:
82                 # empty mandatory field, invalid input
83                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
84                             " for mandatory field '{}'".format(mongo_id, key))
85                 return False
86             else:
87                 mandatory_fields.remove(key)
88         elif key in mandatory_fields_to_modify:
89             if value is None:
90                 # empty mandatory field, invalid input
91                 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
92                             " for mandatory field '{}'".format(mongo_id, key))
93                 return False
94             else:
95                 testcase[key] = mandatory_fields_to_modify[key](value)
96                 del mandatory_fields_to_modify[key]
97         elif key in fields_to_swap_or_add:
98             if value is None:
99                 swapped_key = fields_to_swap_or_add[key]
100                 swapped_value = testcase[swapped_key]
101                 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
102                 testcase[key] = swapped_value
103                 del fields_to_swap_or_add[key]
104             else:
105                 del fields_to_swap_or_add[key]
106         elif key in optional_fields:
107             if value is None:
108                 # empty optional field, remove
109                 del testcase[key]
110             optional_fields.remove(key)
111         else:
112             # unknown field
113             del testcase[key]
114
115     if len(mandatory_fields) > 0:
116         # some mandatory fields are missing
117         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
118                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
119         return False
120     elif len(mandatory_fields_to_modify) > 0:
121         # some mandatory fields are missing
122         logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
123                     " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
124         return False
125     else:
126         if len(fields_to_swap_or_add) > 0:
127             for key, swap_key in fields_to_swap_or_add.iteritems():
128                 testcase[key] = testcase[swap_key]
129
130         return True
131
132
133 def format_document(testcase):
134     # 1. verify and identify the testcase
135     # 2. if modification is implemented, then use that
136     # 3. if not, try to use default
137     # 4. if 2 or 3 is successful, return True, otherwise return False
138     if verify_document(testcase):
139         project = testcase['project_name']
140         case_name = testcase['case_name']
141         fmt = conf_utils.get_format(project, case_name)
142         if fmt:
143             try:
144                 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
145                 return vars(mongo2elastic_format)[fmt](testcase)
146             except Exception:
147                 logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
148                 return False
149     else:
150         return False
151
152
153 def export_documents(days):
154     cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
155     if days > 0:
156         past_time = datetime.datetime.today() - datetime.timedelta(days=days)
157         cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
158     cmd += [ '--out', '{}'.format(tmp_docs_file)]
159
160     try:
161         subprocess.check_call(cmd)
162     except Exception, err:
163         logger.error("export mongodb failed: %s" % err)
164         exit(-1)
165
166
167 def publish_document(document, es_creds, to):
168     status, data = shared_utils.publish_json(document, es_creds, to)
169     if status > 300:
170         logger.error('Publish record[{}] failed, due to [{}]'
171                     .format(document, json.loads(data)['error']['reason']))
172
173
174 def publish_nonexist_documents(elastic_docs, es_creds, to):
175     try:
176         with open(tmp_docs_file) as fdocs:
177             for doc_line in fdocs:
178                 doc = json.loads(doc_line)
179                 if format_document(doc) and doc not in elastic_docs:
180                     publish_document(doc, es_creds, to)
181     finally:
182         fdocs.close()
183         if os.path.exists(tmp_docs_file):
184             os.remove(tmp_docs_file)
185
186
187 if __name__ == '__main__':
188     base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
189     to = args.output_destination
190     days = args.merge_latest
191     es_creds = args.elasticsearch_username
192
193     if to == 'elasticsearch':
194         to = base_elastic_url
195
196     export_documents(days)
197     elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
198     logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
199     publish_nonexist_documents(elastic_docs, es_creds, to)