13 from common import logger_utils, elastic_access
14 from conf import testcases
15 from conf.config import APIConfig
16 from mongo2elastic import format
18 logger = logger_utils.DashboardLogger('mongo2elastic').get
20 parser = argparse.ArgumentParser()
21 parser.add_argument("-c", "--config-file",
23 help="Config file location")
24 parser.add_argument('-ld', '--latest-days',
28 help='get entries old at most N days from mongodb and'
29 ' parse those that are not already in elasticsearch.'
30 ' If not present, will get everything from mongodb, which is the default')
32 args = parser.parse_args()
33 CONF = APIConfig().parse(args.config_file)
36 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
39 class DocumentPublisher:
41 def __init__(self, doc, fmt, exist_docs, creds, elastic_url):
45 self.exist_docs = exist_docs
46 self.elastic_url = elastic_url
47 self.is_formatted = True
51 if self._verify_document() and self.fmt:
52 self.is_formatted = vars(format)[self.fmt](self.doc)
54 self.is_formatted = False
56 logger.error("Fail in format testcase[%s]\nerror message: %s" %
57 (self.doc, traceback.format_exc()))
58 self.is_formatted = False
63 if self.is_formatted and self.doc not in self.exist_docs:
67 status, data = elastic_access.publish_docs(self.elastic_url, self.creds, self.doc)
69 logger.error('Publish record[{}] failed, due to [{}]'
70 .format(self.doc, json.loads(data)['error']['reason']))
72 def _fix_date(self, date_string):
73 if isinstance(date_string, dict):
74 return date_string['$date']
76 return date_string[:-3].replace(' ', 'T') + 'Z'
78 def _verify_document(self):
89 these fields must be present and must NOT be None
94 these fields will be preserved if the are NOT None
96 mandatory_fields = ['installer',
102 mandatory_fields_to_modify = {'start_date': self._fix_date}
103 fields_to_swap_or_add = {'scenario': 'version'}
104 if '_id' in self.doc:
105 mongo_id = self.doc['_id']
108 optional_fields = ['description']
109 for key, value in self.doc.items():
110 if key in mandatory_fields:
112 # empty mandatory field, invalid input
113 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
114 " for mandatory field '{}'".format(mongo_id, key))
117 mandatory_fields.remove(key)
118 elif key in mandatory_fields_to_modify:
120 # empty mandatory field, invalid input
121 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
122 " for mandatory field '{}'".format(mongo_id, key))
125 self.doc[key] = mandatory_fields_to_modify[key](value)
126 del mandatory_fields_to_modify[key]
127 elif key in fields_to_swap_or_add:
129 swapped_key = fields_to_swap_or_add[key]
130 swapped_value = self.doc[swapped_key]
131 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
133 self.doc[key] = swapped_value
134 del fields_to_swap_or_add[key]
136 del fields_to_swap_or_add[key]
137 elif key in optional_fields:
139 # empty optional field, remove
141 optional_fields.remove(key)
146 if len(mandatory_fields) > 0:
147 # some mandatory fields are missing
148 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
149 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
151 elif len(mandatory_fields_to_modify) > 0:
152 # some mandatory fields are missing
153 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
154 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
157 if len(fields_to_swap_or_add) > 0:
158 for key, swap_key in fields_to_swap_or_add.iteritems():
159 self.doc[key] = self.doc[swap_key]
164 class DocumentsPublisher:
166 def __init__(self, project, case, fmt, days, elastic_url, creds):
167 self.project = project
171 self.elastic_url = elastic_url
173 self.existed_docs = []
177 past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
179 "project_name": "{}",
181 "start_date": {{"$gt" : "{}"}}
182 }}'''.format(self.project, self.case, past_time)
185 "project_name": "{}",
187 }}'''.format(self.project, self.case)
188 cmd = ['mongoexport',
189 '--db', 'test_results_collection',
190 '--collection', 'results',
191 '--query', '{}'.format(query),
192 '--out', '{}'.format(tmp_docs_file)]
194 subprocess.check_call(cmd)
196 except Exception, err:
197 logger.error("export mongodb failed: %s" % err)
201 def get_existed_docs(self):
207 {{ "match": {{ "project_name": "{}" }} }},
208 {{ "match": {{ "case_name": "{}" }} }}
212 }}'''.format(self.project, self.case)
218 {{ "match": {{ "project_name": "{}" }} }},
219 {{ "match": {{ "case_name": "{}" }} }}
223 "start_date": {{ "gte": "now-{}d" }}
228 }}'''.format(self.project, self.case, self.days)
230 raise Exception('Update days must be non-negative')
231 self.existed_docs = elastic_access.get_docs(self.elastic_url, self.creds, body)
236 with open(tmp_docs_file) as fdocs:
237 for doc_line in fdocs:
238 DocumentPublisher(json.loads(doc_line),
242 self.elastic_url).format().publish()
248 if os.path.exists(tmp_docs_file):
249 os.remove(tmp_docs_file)
253 base_elastic_url = urlparse.urljoin(CONF.elastic_url, '/test_results/mongo2elastic')
254 days = args.latest_days
255 es_creds = CONF.elastic_creds
257 for project, case_dicts in testcases.testcases_yaml.items():
258 for case_dict in case_dicts:
259 case = case_dict.get('name')
260 fmt = testcases.compose_format(case_dict.get('format'))
261 DocumentsPublisher(project,
266 es_creds).export().get_existed_docs().publish()