import os
import subprocess
import traceback
-import urlparse
import uuid
import argparse
-from common import logger_utils, elastic_access
-from conf import testcases
-from conf.config import APIConfig
-from mongo2elastic import format
+from dashboard.common import elastic_access
+from dashboard.common import logger_utils
+from dashboard.conf import testcases
+from dashboard.conf.config import APIConfig
+from dashboard.mongo2elastic import format
logger = logger_utils.DashboardLogger('mongo2elastic').get
metavar='N',
help='get entries old at most N days from mongodb and'
' parse those that are not already in elasticsearch.'
- ' If not present, will get everything from mongodb, which is the default')
+ ' If not present, will get everything from mongodb,'
+ ' which is the default')
args = parser.parse_args()
CONF = APIConfig().parse(args.config_file)
tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
-class DocumentPublisher:
+class DocumentVerification(object):
- def __init__(self, doc, fmt, exist_docs, creds, to):
+ def __init__(self, doc):
+ super(DocumentVerification, self).__init__()
+ self.doc = doc
+ self.doc_id = doc['_id'] if '_id' in doc else None
+ self.skip = False
+
+ def mandatory_fields_exist(self):
+ mandatory_fields = ['installer',
+ 'pod_name',
+ 'version',
+ 'case_name',
+ 'project_name',
+ 'details',
+ 'start_date',
+ 'scenario']
+ for key, value in self.doc.items():
+ if key in mandatory_fields:
+ if value is None:
+ logger.info("Skip testcase '%s' because field "
+ "'%s' missing" % (self.doc_id, key))
+ self.skip = True
+ else:
+ mandatory_fields.remove(key)
+ else:
+ del self.doc[key]
+
+ if len(mandatory_fields) > 0:
+ logger.info("Skip testcase '%s' because field(s) '%s' missing" %
+ (self.doc_id, mandatory_fields))
+ self.skip = True
+
+ return self
+
+ def modify_start_date(self):
+ field = 'start_date'
+ if field in self.doc:
+ self.doc[field] = self._fix_date(self.doc[field])
+
+ return self
+
+ def modify_scenario(self):
+ scenario = 'scenario'
+ version = 'version'
+
+ if (scenario not in self.doc) or \
+ (scenario in self.doc and self.doc[scenario] is None):
+ self.doc[scenario] = self.doc[version]
+
+ return self
+
+ def is_skip(self):
+ return self.skip
+
+ def _fix_date(self, date_string):
+ if date_string == 'None':
+ return None
+ if isinstance(date_string, dict):
+ return date_string['$date']
+ if 'T' not in date_string:
+ date_string = date_string[:-3].replace(' ', 'T')
+ if not date_string.endswith('Z'):
+ date_string += 'Z'
+
+ return date_string
+
+
+class DocumentPublisher(object):
+
+ def __init__(self, doc, fmt, exist_docs, creds, elastic_url):
self.doc = doc
self.fmt = fmt
self.creds = creds
self.exist_docs = exist_docs
- self.to = to
+ self.elastic_url = elastic_url
self.is_formatted = True
def format(self):
self._publish()
def _publish(self):
- status, data = elastic_access.publish_docs(self.doc, self.creds, self.to)
+ status, data = elastic_access.publish_docs(
+ self.elastic_url, self.creds, self.doc)
if status > 300:
logger.error('Publish record[{}] failed, due to [{}]'
- .format(self.doc, json.loads(data)['error']['reason']))
+ .format(self.doc,
+ json.loads(data)['error']['reason']))
def _fix_date(self, date_string):
if isinstance(date_string, dict):
return date_string[:-3].replace(' ', 'T') + 'Z'
def _verify_document(self):
- """
- Mandatory fields:
- installer
- pod_name
- version
- case_name
- date
- project
- details
-
- these fields must be present and must NOT be None
-
- Optional fields:
- description
-
- these fields will be preserved if the are NOT None
- """
- mandatory_fields = ['installer',
- 'pod_name',
- 'version',
- 'case_name',
- 'project_name',
- 'details']
- mandatory_fields_to_modify = {'start_date': self._fix_date}
- fields_to_swap_or_add = {'scenario': 'version'}
- if '_id' in self.doc:
- mongo_id = self.doc['_id']
- else:
- mongo_id = None
- optional_fields = ['description']
- for key, value in self.doc.items():
- if key in mandatory_fields:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
- else:
- mandatory_fields.remove(key)
- elif key in mandatory_fields_to_modify:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
- else:
- self.doc[key] = mandatory_fields_to_modify[key](value)
- del mandatory_fields_to_modify[key]
- elif key in fields_to_swap_or_add:
- if value is None:
- swapped_key = fields_to_swap_or_add[key]
- swapped_value = self.doc[swapped_key]
- logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
- swapped_value))
- self.doc[key] = swapped_value
- del fields_to_swap_or_add[key]
- else:
- del fields_to_swap_or_add[key]
- elif key in optional_fields:
- if value is None:
- # empty optional field, remove
- del self.doc[key]
- optional_fields.remove(key)
- else:
- # unknown field
- del self.doc[key]
-
- if len(mandatory_fields) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
- return False
- elif len(mandatory_fields_to_modify) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
- return False
- else:
- if len(fields_to_swap_or_add) > 0:
- for key, swap_key in fields_to_swap_or_add.iteritems():
- self.doc[key] = self.doc[swap_key]
+ return not (DocumentVerification(self.doc)
+ .modify_start_date()
+ .modify_scenario()
+ .mandatory_fields_exist()
+ .is_skip())
- return True
+class DocumentsPublisher(object):
-class DocumentsPublisher:
-
- def __init__(self, project, case, fmt, days, elastic_url, creds, to):
+ def __init__(self, project, case, fmt, days, elastic_url, creds):
self.project = project
self.case = case
self.fmt = fmt
self.days = days
self.elastic_url = elastic_url
self.creds = creds
- self.to = to
self.existed_docs = []
def export(self):
if self.days > 0:
- past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
+ past_time = datetime.datetime.today(
+ ) - datetime.timedelta(days=self.days)
query = '''{{
"project_name": "{}",
"case_name": "{}",
try:
subprocess.check_call(cmd)
return self
- except Exception, err:
+ except Exception as err:
logger.error("export mongodb failed: %s" % err)
self._remove()
exit(-1)
- def get_existed_docs(self):
+ def get_exists(self):
if self.days == 0:
body = '''{{
"query": {{
}}'''.format(self.project, self.case, self.days)
else:
raise Exception('Update days must be non-negative')
- self.existed_docs = elastic_access.get_docs(self.elastic_url, self.creds, body)
+ self.existed_docs = elastic_access.get_docs(
+ self.elastic_url, self.creds, body)
return self
def publish(self):
+ fdocs = None
try:
with open(tmp_docs_file) as fdocs:
for doc_line in fdocs:
self.fmt,
self.existed_docs,
self.creds,
- self.to).format().publish()
+ self.elastic_url).format().publish()
finally:
- fdocs.close()
+ if fdocs:
+ fdocs.close()
self._remove()
def _remove(self):
def main():
- base_elastic_url = urlparse.urljoin(CONF.elastic_url, '/test_results/mongo2elastic')
- to = CONF.destination
- days = args.latest_days
- es_creds = CONF.elastic_creds
-
- if to == 'elasticsearch':
- to = base_elastic_url
-
for project, case_dicts in testcases.testcases_yaml.items():
for case_dict in case_dicts:
case = case_dict.get('name')
DocumentsPublisher(project,
case,
fmt,
- days,
- base_elastic_url,
- es_creds,
- to).export().get_existed_docs().publish()
+ args.latest_days,
+ CONF.index_url,
+ CONF.es_creds).export().get_exists().publish()