1 ##############################################################################
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from src.conductor_processor.task import TaskFile
16 class WorkflowFile(object):
17 def __init__(self, name):
18 self._name = "workflow_" + name + "(%s)" % getRandString(10)
19 self._description = ''
21 self._schemaVersion = 2
23 self._outputParameters = {}
26 d = collections.OrderedDict()
27 d['name'] = self._name
28 d['description'] = self._description
29 d['version'] = self._version
30 d['schemaVersion'] = self._schemaVersion
31 d['tasks'] = self._tasks
32 d['outputParameters'] = self._outputParameters
36 def generateMetaData(self, flowList, stepObjArr):
37 flowParser = FlowParser(flowList, stepObjArr)
38 self._tasks, taskMetaList = flowParser.parseMainFlow()
39 normalTasks = flowParser.getNormalTaskList()
40 for normalTask in normalTasks:
41 taskName = normalTask['name']
42 referenceName = normalTask['taskReferenceName']
43 self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
44 "${%s.output.response.body}" % referenceName
45 return self.getDict(), taskMetaList
48 class FlowParser(object):
49 def __init__(self, flowList, stepObjArr):
51 self._subFlowDict = {}
52 self._stepObjArr = stepObjArr
53 self._normalTasks = []
55 if flow['name'] == "main":
58 self._subFlowDict[flow['name']] = flow
60 def parseMainFlow(self):
61 return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
63 def parse(self, obj, stepObjArr):
64 if isinstance(obj, str):
65 return self.parseFlow(obj, stepObjArr)
67 return self.parseOrderList(obj, stepObjArr)
69 def parseFlow(self, flowName, stepObjArr):
70 orderList = self._subFlowDict[flowName]['orders']
71 return self.parseOrderList(orderList, stepObjArr)
73 def parseOrderList(self, orderList, stepObjArr):
76 for order in orderList:
77 if order['type'] == "normal":
78 genTask = NormalTask(order, stepObjArr, self)
79 self._normalTasks.append(genTask)
80 elif order['type'] == "switch":
81 genTask = SwitchTask(order, stepObjArr, self)
82 elif order['type'] == "parallel":
83 genTask = ParallelTask(order, stepObjArr, self)
84 tasks.append(genTask.getDict())
86 if order['type'] == "parallel":
87 joinTask = genTask.getJoinTask()
88 tasks.append(joinTask.getDict())
90 taskMetaList = genTask.getTaskMetaList()
91 if taskMetaList is not None:
92 taskMetaAllList.extend(taskMetaList)
93 return tasks, taskMetaAllList
95 def getNormalTaskList(self):
97 for normalTask in self._normalTasks:
98 normalTasksDict.append(normalTask.getDict())
99 return normalTasksDict
101 def getNormalTask(self, stepId):
102 for normalTask in self._normalTasks:
103 if normalTask.getStepId() == stepId:
108 class BaseWorkflowTask(object):
109 def __init__(self, name):
111 self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
116 dictObj = self.getDict()
122 "taskReferenceName": self._taskReferenceName,
125 return dict(d1, **self._args)
130 def getReferenceName(self):
131 return self._taskReferenceName
133 def getTaskMetaList(self):
134 taskFile = TaskFile()
135 return [taskFile.generateFromStep(self)]
138 class NormalTask(BaseWorkflowTask):
139 def __init__(self, order, stepObjArr, flowParser):
140 relatedStepObj = stepObjArr[order['step'] - 1]
141 super(NormalTask, self).__init__(relatedStepObj.getName())
142 self._taskReferenceName = "task_%s" % getRandString(10)
143 self._stepId = relatedStepObj.getId()
145 self._args['inputParameters'] = relatedStepObj.getArgs()
146 self._paramTransform(self._args['inputParameters'], flowParser)
147 print "NormalTask:----------------------\n", relatedStepObj.getArgs()
149 def _paramTransform(self, argsDict, flowParser):
150 for (k, v) in argsDict.items():
151 if isinstance(v, str):
152 if re.match("^\(\(\d+\..*\)\)", v):
154 stepId, outputParam = v.split(".")
156 normalTask = flowParser.getNormalTask(stepId)
157 if normalTask is None:
159 argsDict[k] = "${%s.output.response.body.%s}" % \
160 (normalTask.getReferenceName(), outputParam)
161 elif isinstance(v, dict):
162 self._paramTransform(v, flowParser)
168 class SwitchTask(BaseWorkflowTask):
171 def __init__(self, order, stepObjArr, flowParser):
172 super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
173 SwitchTask.seqNumber = SwitchTask.seqNumber + 1
175 self._name = order['name']
176 self._type = "DECISION"
177 caseValueParam = 'value'
178 order['value'] = order['value'][2:-2]
179 stepId, outputParam = order['value'].split(".")
181 normalTask = flowParser.getNormalTask(stepId)
182 caseValue = "${%s.output.response.body.%s}" % \
183 (normalTask.getReferenceName(), outputParam)
184 self._args['inputParameters'] = {caseValueParam: caseValue}
185 self._args['caseValueParam'] = caseValueParam
186 self._args['decisionCases'] = {}
187 self._childTaskMetaList = []
188 for case, caseOrders in order['cases'].items():
189 self._args['decisionCases'][case], taskMetaList = \
190 flowParser.parse(caseOrders, stepObjArr)
191 if taskMetaList is not None:
192 self._childTaskMetaList.extend(taskMetaList)
194 def getTaskMetaList(self):
195 selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
196 selfTaskMetaList.extend(self._childTaskMetaList)
197 return selfTaskMetaList
200 class ParallelTask(BaseWorkflowTask):
203 def __init__(self, order, stepObjArr, flowParser):
204 InstSeqNumber = ParallelTask.seqNumber
205 super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
206 ParallelTask.seqNumber = ParallelTask.seqNumber + 1
208 self._name = order['name']
209 self._type = "FORK_JOIN"
210 self._args['forkTasks'] = []
211 self._childTaskMetaList = []
212 lastTasksNameList = []
213 parallelList = order['parallel'].items()
215 for key, orderList in parallelList:
217 taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
218 self._args['forkTasks'].append(taskList)
219 lastTasksNameList.append(taskList[-1]['taskReferenceName'])
220 if taskMetaList is not None:
221 self._childTaskMetaList.extend(taskMetaList)
222 self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
224 def getTaskMetaList(self):
225 selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
226 selfTaskMetaList.extend(self._childTaskMetaList)
227 selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
228 return selfTaskMetaList
230 def getJoinTask(self):
231 return self._joinTaskObj
234 class ParallelJoinTask(BaseWorkflowTask):
235 def __init__(self, seqNumber, joinOnList):
236 super(ParallelJoinTask, self).__init__(
237 "paralleljoin_" + str(seqNumber))
239 self._args['joinOn'] = joinOnList
242 def getRandString(length):
243 return "".join(random.choice(str("0123456789")) for i in range(length))