15 import mongo2elastic_format
18 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
20 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
21 parser.add_argument('-od', '--output-destination',
22 default='elasticsearch',
23 choices=('elasticsearch', 'stdout'),
24 help='defaults to elasticsearch')
26 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
27 help='get entries old at most N days from mongodb and'
28 ' parse those that are not already in elasticsearch.'
29 ' If not present, will get everything from mongodb, which is the default')
31 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
32 help='the url of elasticsearch, defaults to http://localhost:9200')
34 parser.add_argument('-u', '--elasticsearch-username', default=None,
35 help='The username with password for elasticsearch in format username:password')
37 args = parser.parse_args()
39 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
42 class DocumentPublisher:
44 def __init__(self, doc, fmt, exist_docs, creds, to):
48 self.exist_docs = exist_docs
50 self.is_formatted = True
54 if self._verify_document() and self.fmt:
55 self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
57 self.is_formatted = False
59 logger.error("Fail in format testcase[%s]\nerror message: %s" %
60 (self.doc, traceback.format_exc()))
61 self.is_formatted = False
66 if self.is_formatted and self.doc not in self.exist_docs:
70 status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
72 logger.error('Publish record[{}] failed, due to [{}]'
73 .format(self.doc, json.loads(data)['error']['reason']))
75 def _fix_date(self, date_string):
76 if isinstance(date_string, dict):
77 return date_string['$date']
79 return date_string[:-3].replace(' ', 'T') + 'Z'
81 def _verify_document(self):
92 these fields must be present and must NOT be None
97 these fields will be preserved if the are NOT None
99 mandatory_fields = ['installer',
105 mandatory_fields_to_modify = {'start_date': self._fix_date}
106 fields_to_swap_or_add = {'scenario': 'version'}
107 if '_id' in self.doc:
108 mongo_id = self.doc['_id']
111 optional_fields = ['description']
112 for key, value in self.doc.items():
113 if key in mandatory_fields:
115 # empty mandatory field, invalid input
116 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
117 " for mandatory field '{}'".format(mongo_id, key))
120 mandatory_fields.remove(key)
121 elif key in mandatory_fields_to_modify:
123 # empty mandatory field, invalid input
124 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
125 " for mandatory field '{}'".format(mongo_id, key))
128 self.doc[key] = mandatory_fields_to_modify[key](value)
129 del mandatory_fields_to_modify[key]
130 elif key in fields_to_swap_or_add:
132 swapped_key = fields_to_swap_or_add[key]
133 swapped_value = self.doc[swapped_key]
134 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
136 self.doc[key] = swapped_value
137 del fields_to_swap_or_add[key]
139 del fields_to_swap_or_add[key]
140 elif key in optional_fields:
142 # empty optional field, remove
144 optional_fields.remove(key)
149 if len(mandatory_fields) > 0:
150 # some mandatory fields are missing
151 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
152 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
154 elif len(mandatory_fields_to_modify) > 0:
155 # some mandatory fields are missing
156 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
157 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
160 if len(fields_to_swap_or_add) > 0:
161 for key, swap_key in fields_to_swap_or_add.iteritems():
162 self.doc[key] = self.doc[swap_key]
167 class DocumentsPublisher:
169 def __init__(self, project, case, fmt, days, elastic_url, creds, to):
170 self.project = project
174 self.elastic_url = elastic_url
177 self.existed_docs = []
181 past_time = datetime.datetime.today() - datetime.timedelta(days=days)
183 "project_name": "{}",
185 "start_date": {{"$gt" : "{}"}}
186 }}'''.format(self.project, self.case, past_time)
189 "project_name": "{}",
191 }}'''.format(self.project, self.case)
192 cmd = ['mongoexport',
193 '--db', 'test_results_collection',
194 '--collection', 'results',
195 '--query', '{}'.format(query),
196 '--out', '{}'.format(tmp_docs_file)]
198 subprocess.check_call(cmd)
200 except Exception, err:
201 logger.error("export mongodb failed: %s" % err)
205 def get_existed_docs(self):
206 self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, days)
211 with open(tmp_docs_file) as fdocs:
212 for doc_line in fdocs:
213 DocumentPublisher(json.loads(doc_line),
217 self.to).format().publish()
223 if os.path.exists(tmp_docs_file):
224 os.remove(tmp_docs_file)
227 if __name__ == '__main__':
228 base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
229 to = args.output_destination
230 days = args.merge_latest
231 es_creds = args.elasticsearch_username
233 if to == 'elasticsearch':
234 to = base_elastic_url
236 for project, case_dicts in conf_utils.testcases_yaml.items():
237 for case_dict in case_dicts:
238 case = case_dict.get('name')
239 fmt = conf_utils.compose_format(case_dict.get('format'))
240 DocumentsPublisher(project,
246 to).export().get_existed_docs().publish()