14 import mongo2elastic_format
16 import testcases_parser
17 from config import APIConfig
19 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
21 parser = argparse.ArgumentParser()
22 parser.add_argument("-c", "--config-file",
24 help="Config file location")
25 parser.add_argument('-ld', '--latest-days',
29 help='get entries old at most N days from mongodb and'
30 ' parse those that are not already in elasticsearch.'
31 ' If not present, will get everything from mongodb, which is the default')
33 args = parser.parse_args()
34 CONF = APIConfig().parse(args.config_file)
37 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
40 class DocumentPublisher:
42 def __init__(self, doc, fmt, exist_docs, creds, to):
46 self.exist_docs = exist_docs
48 self.is_formatted = True
52 if self._verify_document() and self.fmt:
53 self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
55 self.is_formatted = False
57 logger.error("Fail in format testcase[%s]\nerror message: %s" %
58 (self.doc, traceback.format_exc()))
59 self.is_formatted = False
64 if self.is_formatted and self.doc not in self.exist_docs:
68 status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
70 logger.error('Publish record[{}] failed, due to [{}]'
71 .format(self.doc, json.loads(data)['error']['reason']))
73 def _fix_date(self, date_string):
74 if isinstance(date_string, dict):
75 return date_string['$date']
77 return date_string[:-3].replace(' ', 'T') + 'Z'
79 def _verify_document(self):
90 these fields must be present and must NOT be None
95 these fields will be preserved if the are NOT None
97 mandatory_fields = ['installer',
103 mandatory_fields_to_modify = {'start_date': self._fix_date}
104 fields_to_swap_or_add = {'scenario': 'version'}
105 if '_id' in self.doc:
106 mongo_id = self.doc['_id']
109 optional_fields = ['description']
110 for key, value in self.doc.items():
111 if key in mandatory_fields:
113 # empty mandatory field, invalid input
114 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
115 " for mandatory field '{}'".format(mongo_id, key))
118 mandatory_fields.remove(key)
119 elif key in mandatory_fields_to_modify:
121 # empty mandatory field, invalid input
122 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
123 " for mandatory field '{}'".format(mongo_id, key))
126 self.doc[key] = mandatory_fields_to_modify[key](value)
127 del mandatory_fields_to_modify[key]
128 elif key in fields_to_swap_or_add:
130 swapped_key = fields_to_swap_or_add[key]
131 swapped_value = self.doc[swapped_key]
132 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
134 self.doc[key] = swapped_value
135 del fields_to_swap_or_add[key]
137 del fields_to_swap_or_add[key]
138 elif key in optional_fields:
140 # empty optional field, remove
142 optional_fields.remove(key)
147 if len(mandatory_fields) > 0:
148 # some mandatory fields are missing
149 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
150 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
152 elif len(mandatory_fields_to_modify) > 0:
153 # some mandatory fields are missing
154 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
155 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
158 if len(fields_to_swap_or_add) > 0:
159 for key, swap_key in fields_to_swap_or_add.iteritems():
160 self.doc[key] = self.doc[swap_key]
165 class DocumentsPublisher:
167 def __init__(self, project, case, fmt, days, elastic_url, creds, to):
168 self.project = project
172 self.elastic_url = elastic_url
175 self.existed_docs = []
179 past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
181 "project_name": "{}",
183 "start_date": {{"$gt" : "{}"}}
184 }}'''.format(self.project, self.case, past_time)
187 "project_name": "{}",
189 }}'''.format(self.project, self.case)
190 cmd = ['mongoexport',
191 '--db', 'test_results_collection',
192 '--collection', 'results',
193 '--query', '{}'.format(query),
194 '--out', '{}'.format(tmp_docs_file)]
196 subprocess.check_call(cmd)
198 except Exception, err:
199 logger.error("export mongodb failed: %s" % err)
203 def get_existed_docs(self):
204 self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, self.days)
209 with open(tmp_docs_file) as fdocs:
210 for doc_line in fdocs:
211 DocumentPublisher(json.loads(doc_line),
215 self.to).format().publish()
221 if os.path.exists(tmp_docs_file):
222 os.remove(tmp_docs_file)
226 base_elastic_url = urlparse.urljoin(CONF.elastic_url, '/test_results/mongo2elastic')
227 to = CONF.destination
228 days = args.latest_days
229 es_creds = CONF.elastic_creds
231 if to == 'elasticsearch':
232 to = base_elastic_url
234 for project, case_dicts in testcases_parser.testcases_yaml.items():
235 for case_dict in case_dicts:
236 case = case_dict.get('name')
237 fmt = testcases_parser.compose_format(case_dict.get('format'))
238 DocumentsPublisher(project,
244 to).export().get_existed_docs().publish()
247 if __name__ == '__main__':