X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?p=bottlenecks.git;a=blobdiff_plain;f=test-scheduler%2Fserver%2Fsrc%2Fconductor_processor%2Fworkflow.py;fp=testing-scheduler%2Fserver%2Fsrc%2Fconductor_processor%2Fworkflow.py;h=da67e013b80e8f2a55e019b65540b5ca65ca41de;hp=19f0896c42bcca819bcff0ebff9934db63c83728;hb=e32043f58a2450b6a5986dc2a01f64f8b22c3992;hpb=a09bbea983aca3e437e254566da98196177748d9 diff --git a/testing-scheduler/server/src/conductor_processor/workflow.py b/test-scheduler/server/src/conductor_processor/workflow.py similarity index 97% rename from testing-scheduler/server/src/conductor_processor/workflow.py rename to test-scheduler/server/src/conductor_processor/workflow.py index 19f0896c..da67e013 100644 --- a/testing-scheduler/server/src/conductor_processor/workflow.py +++ b/test-scheduler/server/src/conductor_processor/workflow.py @@ -1,243 +1,243 @@ -############################################################################## -# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -import random -import collections -import re -from src.conductor_processor.task import TaskFile - - -class WorkflowFile(object): - def __init__(self, name): - self._name = "workflow_" + name + "(%s)" % getRandString(10) - self._description = '' - self._version = 1 - self._schemaVersion = 2 - self._tasks = [] - self._outputParameters = {} - - def getDict(self): - d = collections.OrderedDict() - d['name'] = self._name - d['description'] = self._description - d['version'] = self._version - d['schemaVersion'] = self._schemaVersion - d['tasks'] = self._tasks - d['outputParameters'] = self._outputParameters - - return d - - def generateMetaData(self, flowList, stepObjArr): - flowParser = FlowParser(flowList, stepObjArr) - self._tasks, taskMetaList = flowParser.parseMainFlow() - normalTasks = flowParser.getNormalTaskList() - for normalTask in normalTasks: - taskName = normalTask['name'] - referenceName = normalTask['taskReferenceName'] - self._outputParameters["%s(%s)" % (taskName, referenceName)] = \ - "${%s.output.response.body}" % referenceName - return self.getDict(), taskMetaList - - -class FlowParser(object): - def __init__(self, flowList, stepObjArr): - self._mainFlow = {} - self._subFlowDict = {} - self._stepObjArr = stepObjArr - self._normalTasks = [] - for flow in flowList: - if flow['name'] == "main": - self._mainFlow = flow - else: - self._subFlowDict[flow['name']] = flow - - def parseMainFlow(self): - return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr) - - def parse(self, obj, stepObjArr): - if isinstance(obj, str): - return self.parseFlow(obj, stepObjArr) - else: - return self.parseOrderList(obj, stepObjArr) - - def parseFlow(self, flowName, stepObjArr): - orderList = self._subFlowDict[flowName]['orders'] - return self.parseOrderList(orderList, stepObjArr) - - def parseOrderList(self, orderList, stepObjArr): - tasks = [] - taskMetaAllList = [] - for order in orderList: - if order['type'] == "normal": - genTask = NormalTask(order, stepObjArr, self) - self._normalTasks.append(genTask) - elif order['type'] == "switch": - genTask = SwitchTask(order, stepObjArr, self) - elif order['type'] == "parallel": - genTask = ParallelTask(order, stepObjArr, self) - tasks.append(genTask.getDict()) - - if order['type'] == "parallel": - joinTask = genTask.getJoinTask() - tasks.append(joinTask.getDict()) - - taskMetaList = genTask.getTaskMetaList() - if taskMetaList is not None: - taskMetaAllList.extend(taskMetaList) - return tasks, taskMetaAllList - - def getNormalTaskList(self): - normalTasksDict = [] - for normalTask in self._normalTasks: - normalTasksDict.append(normalTask.getDict()) - return normalTasksDict - - def getNormalTask(self, stepId): - for normalTask in self._normalTasks: - if normalTask.getStepId() == stepId: - return normalTask - return None - - -class BaseWorkflowTask(object): - def __init__(self, name): - self._name = name - self._taskReferenceName = self._name + "_task_%s" % getRandString(10) - self._type = '' - self._args = {} - - def __str__(self): - dictObj = self.getDict() - return str(dictObj) - - def getDict(self): - d1 = { - "name": self._name, - "taskReferenceName": self._taskReferenceName, - "type": self._type - } - return dict(d1, **self._args) - - def getName(self): - return self._name - - def getReferenceName(self): - return self._taskReferenceName - - def getTaskMetaList(self): - taskFile = TaskFile() - return [taskFile.generateFromStep(self)] - - -class NormalTask(BaseWorkflowTask): - def __init__(self, order, stepObjArr, flowParser): - relatedStepObj = stepObjArr[order['step'] - 1] - super(NormalTask, self).__init__(relatedStepObj.getName()) - self._taskReferenceName = "task_%s" % getRandString(10) - self._stepId = relatedStepObj.getId() - self._type = "HTTP" - self._args['inputParameters'] = relatedStepObj.getArgs() - self._paramTransform(self._args['inputParameters'], flowParser) - print "NormalTask:----------------------\n", relatedStepObj.getArgs() - - def _paramTransform(self, argsDict, flowParser): - for (k, v) in argsDict.items(): - if isinstance(v, str): - if re.match("^\(\(\d+\..*\)\)", v): - v = v[2:-2] - stepId, outputParam = v.split(".") - stepId = int(stepId) - normalTask = flowParser.getNormalTask(stepId) - if normalTask is None: - continue - argsDict[k] = "${%s.output.response.body.%s}" % \ - (normalTask.getReferenceName(), outputParam) - elif isinstance(v, dict): - self._paramTransform(v, flowParser) - - def getStepId(self): - return self._stepId - - -class SwitchTask(BaseWorkflowTask): - seqNumber = 0 - - def __init__(self, order, stepObjArr, flowParser): - super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber)) - SwitchTask.seqNumber = SwitchTask.seqNumber + 1 - if 'name' in order: - self._name = order['name'] - self._type = "DECISION" - caseValueParam = 'value' - order['value'] = order['value'][2:-2] - stepId, outputParam = order['value'].split(".") - stepId = int(stepId) - normalTask = flowParser.getNormalTask(stepId) - caseValue = "${%s.output.response.body.%s}" % \ - (normalTask.getReferenceName(), outputParam) - self._args['inputParameters'] = {caseValueParam: caseValue} - self._args['caseValueParam'] = caseValueParam - self._args['decisionCases'] = {} - self._childTaskMetaList = [] - for case, caseOrders in order['cases'].items(): - self._args['decisionCases'][case], taskMetaList = \ - flowParser.parse(caseOrders, stepObjArr) - if taskMetaList is not None: - self._childTaskMetaList.extend(taskMetaList) - - def getTaskMetaList(self): - selfTaskMetaList = super(SwitchTask, self).getTaskMetaList() - selfTaskMetaList.extend(self._childTaskMetaList) - return selfTaskMetaList - - -class ParallelTask(BaseWorkflowTask): - seqNumber = 0 - - def __init__(self, order, stepObjArr, flowParser): - InstSeqNumber = ParallelTask.seqNumber - super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber)) - ParallelTask.seqNumber = ParallelTask.seqNumber + 1 - if 'name' in order: - self._name = order['name'] - self._type = "FORK_JOIN" - self._args['forkTasks'] = [] - self._childTaskMetaList = [] - lastTasksNameList = [] - parallelList = order['parallel'].items() - parallelList.sort() - for key, orderList in parallelList: - print orderList - taskList, taskMetaList = flowParser.parse(orderList, stepObjArr) - self._args['forkTasks'].append(taskList) - lastTasksNameList.append(taskList[-1]['taskReferenceName']) - if taskMetaList is not None: - self._childTaskMetaList.extend(taskMetaList) - self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList) - - def getTaskMetaList(self): - selfTaskMetaList = super(ParallelTask, self).getTaskMetaList() - selfTaskMetaList.extend(self._childTaskMetaList) - selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList()) - return selfTaskMetaList - - def getJoinTask(self): - return self._joinTaskObj - - -class ParallelJoinTask(BaseWorkflowTask): - def __init__(self, seqNumber, joinOnList): - super(ParallelJoinTask, self).__init__( - "paralleljoin_" + str(seqNumber)) - self._type = "JOIN" - self._args['joinOn'] = joinOnList - - -def getRandString(length): - return "".join(random.choice(str("0123456789")) for i in range(length)) +############################################################################## +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import random +import collections +import re +from src.conductor_processor.task import TaskFile + + +class WorkflowFile(object): + def __init__(self, name): + self._name = "workflow_" + name + "(%s)" % getRandString(10) + self._description = '' + self._version = 1 + self._schemaVersion = 2 + self._tasks = [] + self._outputParameters = {} + + def getDict(self): + d = collections.OrderedDict() + d['name'] = self._name + d['description'] = self._description + d['version'] = self._version + d['schemaVersion'] = self._schemaVersion + d['tasks'] = self._tasks + d['outputParameters'] = self._outputParameters + + return d + + def generateMetaData(self, flowList, stepObjArr): + flowParser = FlowParser(flowList, stepObjArr) + self._tasks, taskMetaList = flowParser.parseMainFlow() + normalTasks = flowParser.getNormalTaskList() + for normalTask in normalTasks: + taskName = normalTask['name'] + referenceName = normalTask['taskReferenceName'] + self._outputParameters["%s(%s)" % (taskName, referenceName)] = \ + "${%s.output.response.body}" % referenceName + return self.getDict(), taskMetaList + + +class FlowParser(object): + def __init__(self, flowList, stepObjArr): + self._mainFlow = {} + self._subFlowDict = {} + self._stepObjArr = stepObjArr + self._normalTasks = [] + for flow in flowList: + if flow['name'] == "main": + self._mainFlow = flow + else: + self._subFlowDict[flow['name']] = flow + + def parseMainFlow(self): + return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr) + + def parse(self, obj, stepObjArr): + if isinstance(obj, str): + return self.parseFlow(obj, stepObjArr) + else: + return self.parseOrderList(obj, stepObjArr) + + def parseFlow(self, flowName, stepObjArr): + orderList = self._subFlowDict[flowName]['orders'] + return self.parseOrderList(orderList, stepObjArr) + + def parseOrderList(self, orderList, stepObjArr): + tasks = [] + taskMetaAllList = [] + for order in orderList: + if order['type'] == "normal": + genTask = NormalTask(order, stepObjArr, self) + self._normalTasks.append(genTask) + elif order['type'] == "switch": + genTask = SwitchTask(order, stepObjArr, self) + elif order['type'] == "parallel": + genTask = ParallelTask(order, stepObjArr, self) + tasks.append(genTask.getDict()) + + if order['type'] == "parallel": + joinTask = genTask.getJoinTask() + tasks.append(joinTask.getDict()) + + taskMetaList = genTask.getTaskMetaList() + if taskMetaList is not None: + taskMetaAllList.extend(taskMetaList) + return tasks, taskMetaAllList + + def getNormalTaskList(self): + normalTasksDict = [] + for normalTask in self._normalTasks: + normalTasksDict.append(normalTask.getDict()) + return normalTasksDict + + def getNormalTask(self, stepId): + for normalTask in self._normalTasks: + if normalTask.getStepId() == stepId: + return normalTask + return None + + +class BaseWorkflowTask(object): + def __init__(self, name): + self._name = name + self._taskReferenceName = self._name + "_task_%s" % getRandString(10) + self._type = '' + self._args = {} + + def __str__(self): + dictObj = self.getDict() + return str(dictObj) + + def getDict(self): + d1 = { + "name": self._name, + "taskReferenceName": self._taskReferenceName, + "type": self._type + } + return dict(d1, **self._args) + + def getName(self): + return self._name + + def getReferenceName(self): + return self._taskReferenceName + + def getTaskMetaList(self): + taskFile = TaskFile() + return [taskFile.generateFromStep(self)] + + +class NormalTask(BaseWorkflowTask): + def __init__(self, order, stepObjArr, flowParser): + relatedStepObj = stepObjArr[order['step'] - 1] + super(NormalTask, self).__init__(relatedStepObj.getName()) + self._taskReferenceName = "task_%s" % getRandString(10) + self._stepId = relatedStepObj.getId() + self._type = "HTTP" + self._args['inputParameters'] = relatedStepObj.getArgs() + self._paramTransform(self._args['inputParameters'], flowParser) + print "NormalTask:----------------------\n", relatedStepObj.getArgs() + + def _paramTransform(self, argsDict, flowParser): + for (k, v) in argsDict.items(): + if isinstance(v, str): + if re.match("^\(\(\d+\..*\)\)", v): + v = v[2:-2] + stepId, outputParam = v.split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + if normalTask is None: + continue + argsDict[k] = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + elif isinstance(v, dict): + self._paramTransform(v, flowParser) + + def getStepId(self): + return self._stepId + + +class SwitchTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber)) + SwitchTask.seqNumber = SwitchTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "DECISION" + caseValueParam = 'value' + order['value'] = order['value'][2:-2] + stepId, outputParam = order['value'].split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + caseValue = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + self._args['inputParameters'] = {caseValueParam: caseValue} + self._args['caseValueParam'] = caseValueParam + self._args['decisionCases'] = {} + self._childTaskMetaList = [] + for case, caseOrders in order['cases'].items(): + self._args['decisionCases'][case], taskMetaList = \ + flowParser.parse(caseOrders, stepObjArr) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + + def getTaskMetaList(self): + selfTaskMetaList = super(SwitchTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + return selfTaskMetaList + + +class ParallelTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + InstSeqNumber = ParallelTask.seqNumber + super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber)) + ParallelTask.seqNumber = ParallelTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "FORK_JOIN" + self._args['forkTasks'] = [] + self._childTaskMetaList = [] + lastTasksNameList = [] + parallelList = order['parallel'].items() + parallelList.sort() + for key, orderList in parallelList: + print orderList + taskList, taskMetaList = flowParser.parse(orderList, stepObjArr) + self._args['forkTasks'].append(taskList) + lastTasksNameList.append(taskList[-1]['taskReferenceName']) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList) + + def getTaskMetaList(self): + selfTaskMetaList = super(ParallelTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList()) + return selfTaskMetaList + + def getJoinTask(self): + return self._joinTaskObj + + +class ParallelJoinTask(BaseWorkflowTask): + def __init__(self, seqNumber, joinOnList): + super(ParallelJoinTask, self).__init__( + "paralleljoin_" + str(seqNumber)) + self._type = "JOIN" + self._args['joinOn'] = joinOnList + + +def getRandString(length): + return "".join(random.choice(str("0123456789")) for i in range(length))