15 import mongo2elastic_format
17 logger = logging.getLogger('mongo_to_elasticsearch')
18 logger.setLevel(logging.DEBUG)
19 file_handler = logging.FileHandler('/var/log/{}.log'.format('mongo_to_elasticsearch'))
20 file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
21 logger.addHandler(file_handler)
24 def _fix_date(date_string):
25 if isinstance(date_string, dict):
26 return date_string['$date']
28 return date_string[:-3].replace(' ', 'T') + 'Z'
31 def verify_mongo_entry(testcase):
42 these fields must be present and must NOT be None
47 these fields will be preserved if the are NOT None
49 mandatory_fields = ['installer',
55 mandatory_fields_to_modify = {'start_date': _fix_date}
56 fields_to_swap_or_add = {'scenario': 'version'}
58 mongo_id = testcase['_id']
61 optional_fields = ['description']
62 for key, value in testcase.items():
63 if key in mandatory_fields:
65 # empty mandatory field, invalid input
66 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
67 " for mandatory field '{}'".format(mongo_id, key))
70 mandatory_fields.remove(key)
71 elif key in mandatory_fields_to_modify:
73 # empty mandatory field, invalid input
74 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
75 " for mandatory field '{}'".format(mongo_id, key))
78 testcase[key] = mandatory_fields_to_modify[key](value)
79 del mandatory_fields_to_modify[key]
80 elif key in fields_to_swap_or_add:
82 swapped_key = fields_to_swap_or_add[key]
83 swapped_value = testcase[swapped_key]
84 logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
85 testcase[key] = swapped_value
86 del fields_to_swap_or_add[key]
88 del fields_to_swap_or_add[key]
89 elif key in optional_fields:
91 # empty optional field, remove
93 optional_fields.remove(key)
98 if len(mandatory_fields) > 0:
99 # some mandatory fields are missing
100 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
101 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
103 elif len(mandatory_fields_to_modify) > 0:
104 # some mandatory fields are missing
105 logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
106 " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
109 if len(fields_to_swap_or_add) > 0:
110 for key, swap_key in fields_to_swap_or_add.iteritems():
111 testcase[key] = testcase[swap_key]
116 def modify_mongo_entry(testcase):
117 # 1. verify and identify the testcase
118 # 2. if modification is implemented, then use that
119 # 3. if not, try to use default
120 # 4. if 2 or 3 is successful, return True, otherwise return False
121 if verify_mongo_entry(testcase):
122 project = testcase['project_name']
123 case_name = testcase['case_name']
124 fmt = conf_utils.get_format(project, case_name)
127 logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
128 return vars(mongo2elastic_format)[fmt](testcase)
130 logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
135 def publish_mongo_data(output_destination):
136 tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
138 subprocess.check_call(['mongoexport',
139 '--db', 'test_results_collection',
141 '--out', tmp_filename])
142 with open(tmp_filename) as fobj:
143 for mongo_json_line in fobj:
144 test_result = json.loads(mongo_json_line)
145 if modify_mongo_entry(test_result):
146 status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
148 project = test_result['project_name']
149 case_name = test_result['case_name']
150 logger.info('project {} case {} publish failed, due to [{}]'
151 .format(project, case_name, json.loads(data)['error']['reason']))
153 if os.path.exists(tmp_filename):
154 os.remove(tmp_filename)
157 def get_mongo_data(days):
158 past_time = datetime.datetime.today() - datetime.timedelta(days=days)
159 mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
160 '--query', '{{"start_date":{{$gt:"{}"}}}}'
161 .format(past_time)]).splitlines()
164 for mongo_json_line in mongo_json_lines:
165 test_result = json.loads(mongo_json_line)
166 if modify_mongo_entry(test_result):
167 # if the modification could be applied, append the modified result
168 mongo_data.append(test_result)
172 def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
173 for elastic_entry in elastic_data:
174 if elastic_entry in mongo_data:
175 mongo_data.remove(elastic_entry)
177 logger.info('number of parsed test results: {}'.format(len(mongo_data)))
179 for parsed_test_result in mongo_data:
180 shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
183 if __name__ == '__main__':
184 parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
185 parser.add_argument('-od', '--output-destination',
186 default='elasticsearch',
187 choices=('elasticsearch', 'stdout'),
188 help='defaults to elasticsearch')
190 parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
191 help='get entries old at most N days from mongodb and'
192 ' parse those that are not already in elasticsearch.'
193 ' If not present, will get everything from mongodb, which is the default')
195 parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
196 help='the url of elasticsearch, defaults to http://localhost:9200')
198 parser.add_argument('-u', '--elasticsearch-username', default=None,
199 help='The username with password for elasticsearch in format username:password')
201 args = parser.parse_args()
202 base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
203 output_destination = args.output_destination
204 days = args.merge_latest
205 es_creds = args.elasticsearch_username
207 if output_destination == 'elasticsearch':
208 output_destination = base_elastic_url
210 # parsed_test_results will be printed/sent to elasticsearch
212 publish_mongo_data(output_destination)
223 elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
224 logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
225 mongo_data = get_mongo_data(days)
226 publish_difference(mongo_data, elastic_data, output_destination, es_creds)
228 raise Exception('Update must be non-negative')