14 import mongo2elastic_format
17 logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
19 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
20 parser.add_argument('-od', '--output-destination',
21 default='elasticsearch',
22 choices=('elasticsearch', 'stdout'),
23 help='defaults to elasticsearch')
25 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
26 help='get entries old at most N days from mongodb and'
27 ' parse those that are not already in elasticsearch.'
28 ' If not present, will get everything from mongodb, which is the default')
30 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
31 help='the url of elasticsearch, defaults to http://localhost:9200')
33 parser.add_argument('-u', '--elasticsearch-username', default=None,
34 help='The username with password for elasticsearch in format username:password')
36 args = parser.parse_args()
38 tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
41 def _fix_date(date_string):
42 if isinstance(date_string, dict):
43 return date_string['$date']
45 return date_string[:-3].replace(' ', 'T') + 'Z'
48 def verify_document(testcase):
59 these fields must be present and must NOT be None
64 these fields will be preserved if the are NOT None
66 mandatory_fields = ['installer',
72 mandatory_fields_to_modify = {'start_date': _fix_date}
73 fields_to_swap_or_add = {'scenario': 'version'}
75 mongo_id = testcase['_id']
78 optional_fields = ['description']
79 for key, value in testcase.items():
80 if key in mandatory_fields:
82 # empty mandatory field, invalid input
83 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
84 " for mandatory field '{}'".format(mongo_id, key))
87 mandatory_fields.remove(key)
88 elif key in mandatory_fields_to_modify:
90 # empty mandatory field, invalid input
91 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
92 " for mandatory field '{}'".format(mongo_id, key))
95 testcase[key] = mandatory_fields_to_modify[key](value)
96 del mandatory_fields_to_modify[key]
97 elif key in fields_to_swap_or_add:
99 swapped_key = fields_to_swap_or_add[key]
100 swapped_value = testcase[swapped_key]
101 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
102 testcase[key] = swapped_value
103 del fields_to_swap_or_add[key]
105 del fields_to_swap_or_add[key]
106 elif key in optional_fields:
108 # empty optional field, remove
110 optional_fields.remove(key)
115 if len(mandatory_fields) > 0:
116 # some mandatory fields are missing
117 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
118 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
120 elif len(mandatory_fields_to_modify) > 0:
121 # some mandatory fields are missing
122 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
123 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
126 if len(fields_to_swap_or_add) > 0:
127 for key, swap_key in fields_to_swap_or_add.iteritems():
128 testcase[key] = testcase[swap_key]
133 def format_document(testcase):
134 # 1. verify and identify the testcase
135 # 2. if modification is implemented, then use that
136 # 3. if not, try to use default
137 # 4. if 2 or 3 is successful, return True, otherwise return False
138 if verify_document(testcase):
139 project = testcase['project_name']
140 case_name = testcase['case_name']
141 fmt = conf_utils.get_format(project, case_name)
144 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
145 return vars(mongo2elastic_format)[fmt](testcase)
147 logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
153 def export_documents(days):
154 cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
156 past_time = datetime.datetime.today() - datetime.timedelta(days=days)
157 cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
158 cmd += [ '--out', '{}'.format(tmp_docs_file)]
161 subprocess.check_call(cmd)
162 except Exception, err:
163 logger.error("export mongodb failed: %s" % err)
167 def publish_document(document, es_creds, to):
168 status, data = shared_utils.publish_json(document, es_creds, to)
170 logger.error('Publish record[{}] failed, due to [{}]'
171 .format(document, json.loads(data)['error']['reason']))
174 def publish_nonexist_documents(elastic_docs, es_creds, to):
176 with open(tmp_docs_file) as fdocs:
177 for doc_line in fdocs:
178 doc = json.loads(doc_line)
179 if format_document(doc) and doc not in elastic_docs:
180 publish_document(doc, es_creds, to)
183 if os.path.exists(tmp_docs_file):
184 os.remove(tmp_docs_file)
187 if __name__ == '__main__':
188 base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
189 to = args.output_destination
190 days = args.merge_latest
191 es_creds = args.elasticsearch_username
193 if to == 'elasticsearch':
194 to = base_elastic_url
196 export_documents(days)
197 elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
198 logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
199 publish_nonexist_documents(elastic_docs, es_creds, to)