1 ##############################################################################
\r
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
\r
4 # All rights reserved. This program and the accompanying materials
\r
5 # are made available under the terms of the Apache License, Version 2.0
\r
6 # which accompanies this distribution, and is available at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
8 ##############################################################################
\r
13 from src.conductor_processor.task import TaskFile
\r
16 class WorkflowFile(object):
\r
17 def __init__(self, name):
\r
18 self._name = "workflow_" + name + "(%s)" % getRandString(10)
\r
19 self._description = ''
\r
21 self._schemaVersion = 2
\r
23 self._outputParameters = {}
\r
26 d = collections.OrderedDict()
\r
27 d['name'] = self._name
\r
28 d['description'] = self._description
\r
29 d['version'] = self._version
\r
30 d['schemaVersion'] = self._schemaVersion
\r
31 d['tasks'] = self._tasks
\r
32 d['outputParameters'] = self._outputParameters
\r
36 def generateMetaData(self, flowList, stepObjArr):
\r
37 flowParser = FlowParser(flowList, stepObjArr)
\r
38 self._tasks, taskMetaList = flowParser.parseMainFlow()
\r
39 normalTasks = flowParser.getNormalTaskList()
\r
40 for normalTask in normalTasks:
\r
41 taskName = normalTask['name']
\r
42 referenceName = normalTask['taskReferenceName']
\r
43 self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
\r
44 "${%s.output.response.body}" % referenceName
\r
45 return self.getDict(), taskMetaList
\r
48 class FlowParser(object):
\r
49 def __init__(self, flowList, stepObjArr):
\r
51 self._subFlowDict = {}
\r
52 self._stepObjArr = stepObjArr
\r
53 self._normalTasks = []
\r
54 for flow in flowList:
\r
55 if flow['name'] == "main":
\r
56 self._mainFlow = flow
\r
58 self._subFlowDict[flow['name']] = flow
\r
60 def parseMainFlow(self):
\r
61 return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
\r
63 def parse(self, obj, stepObjArr):
\r
64 if isinstance(obj, str):
\r
65 return self.parseFlow(obj, stepObjArr)
\r
67 return self.parseOrderList(obj, stepObjArr)
\r
69 def parseFlow(self, flowName, stepObjArr):
\r
70 orderList = self._subFlowDict[flowName]['orders']
\r
71 return self.parseOrderList(orderList, stepObjArr)
\r
73 def parseOrderList(self, orderList, stepObjArr):
\r
75 taskMetaAllList = []
\r
76 for order in orderList:
\r
77 if order['type'] == "normal":
\r
78 genTask = NormalTask(order, stepObjArr, self)
\r
79 self._normalTasks.append(genTask)
\r
80 elif order['type'] == "switch":
\r
81 genTask = SwitchTask(order, stepObjArr, self)
\r
82 elif order['type'] == "parallel":
\r
83 genTask = ParallelTask(order, stepObjArr, self)
\r
84 tasks.append(genTask.getDict())
\r
86 if order['type'] == "parallel":
\r
87 joinTask = genTask.getJoinTask()
\r
88 tasks.append(joinTask.getDict())
\r
90 taskMetaList = genTask.getTaskMetaList()
\r
91 if taskMetaList is not None:
\r
92 taskMetaAllList.extend(taskMetaList)
\r
93 return tasks, taskMetaAllList
\r
95 def getNormalTaskList(self):
\r
96 normalTasksDict = []
\r
97 for normalTask in self._normalTasks:
\r
98 normalTasksDict.append(normalTask.getDict())
\r
99 return normalTasksDict
\r
101 def getNormalTask(self, stepId):
\r
102 for normalTask in self._normalTasks:
\r
103 if normalTask.getStepId() == stepId:
\r
108 class BaseWorkflowTask(object):
\r
109 def __init__(self, name):
\r
111 self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
\r
116 dictObj = self.getDict()
\r
117 return str(dictObj)
\r
121 "name": self._name,
\r
122 "taskReferenceName": self._taskReferenceName,
\r
125 return dict(d1, **self._args)
\r
130 def getReferenceName(self):
\r
131 return self._taskReferenceName
\r
133 def getTaskMetaList(self):
\r
134 taskFile = TaskFile()
\r
135 return [taskFile.generateFromStep(self)]
\r
138 class NormalTask(BaseWorkflowTask):
\r
139 def __init__(self, order, stepObjArr, flowParser):
\r
140 relatedStepObj = stepObjArr[order['step'] - 1]
\r
141 super(NormalTask, self).__init__(relatedStepObj.getName())
\r
142 self._taskReferenceName = "task_%s" % getRandString(10)
\r
143 self._stepId = relatedStepObj.getId()
\r
144 self._type = "HTTP"
\r
145 self._args['inputParameters'] = relatedStepObj.getArgs()
\r
146 self._paramTransform(self._args['inputParameters'], flowParser)
\r
147 print "NormalTask:----------------------\n", relatedStepObj.getArgs()
\r
149 def _paramTransform(self, argsDict, flowParser):
\r
150 for (k, v) in argsDict.items():
\r
151 if isinstance(v, str):
\r
152 if re.match("^\(\(\d+\..*\)\)", v):
\r
154 stepId, outputParam = v.split(".")
\r
155 stepId = int(stepId)
\r
156 normalTask = flowParser.getNormalTask(stepId)
\r
157 if normalTask is None:
\r
159 argsDict[k] = "${%s.output.response.body.%s}" % \
\r
160 (normalTask.getReferenceName(), outputParam)
\r
161 elif isinstance(v, dict):
\r
162 self._paramTransform(v, flowParser)
\r
164 def getStepId(self):
\r
165 return self._stepId
\r
168 class SwitchTask(BaseWorkflowTask):
\r
171 def __init__(self, order, stepObjArr, flowParser):
\r
172 super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
\r
173 SwitchTask.seqNumber = SwitchTask.seqNumber + 1
\r
174 if 'name' in order:
\r
175 self._name = order['name']
\r
176 self._type = "DECISION"
\r
177 caseValueParam = 'value'
\r
178 order['value'] = order['value'][2:-2]
\r
179 stepId, outputParam = order['value'].split(".")
\r
180 stepId = int(stepId)
\r
181 normalTask = flowParser.getNormalTask(stepId)
\r
182 caseValue = "${%s.output.response.body.%s}" % \
\r
183 (normalTask.getReferenceName(), outputParam)
\r
184 self._args['inputParameters'] = {caseValueParam: caseValue}
\r
185 self._args['caseValueParam'] = caseValueParam
\r
186 self._args['decisionCases'] = {}
\r
187 self._childTaskMetaList = []
\r
188 for case, caseOrders in order['cases'].items():
\r
189 self._args['decisionCases'][case], taskMetaList = \
\r
190 flowParser.parse(caseOrders, stepObjArr)
\r
191 if taskMetaList is not None:
\r
192 self._childTaskMetaList.extend(taskMetaList)
\r
194 def getTaskMetaList(self):
\r
195 selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
\r
196 selfTaskMetaList.extend(self._childTaskMetaList)
\r
197 return selfTaskMetaList
\r
200 class ParallelTask(BaseWorkflowTask):
\r
203 def __init__(self, order, stepObjArr, flowParser):
\r
204 InstSeqNumber = ParallelTask.seqNumber
\r
205 super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
\r
206 ParallelTask.seqNumber = ParallelTask.seqNumber + 1
\r
207 if 'name' in order:
\r
208 self._name = order['name']
\r
209 self._type = "FORK_JOIN"
\r
210 self._args['forkTasks'] = []
\r
211 self._childTaskMetaList = []
\r
212 lastTasksNameList = []
\r
213 parallelList = order['parallel'].items()
\r
214 parallelList.sort()
\r
215 for key, orderList in parallelList:
\r
217 taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
\r
218 self._args['forkTasks'].append(taskList)
\r
219 lastTasksNameList.append(taskList[-1]['taskReferenceName'])
\r
220 if taskMetaList is not None:
\r
221 self._childTaskMetaList.extend(taskMetaList)
\r
222 self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
\r
224 def getTaskMetaList(self):
\r
225 selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
\r
226 selfTaskMetaList.extend(self._childTaskMetaList)
\r
227 selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
\r
228 return selfTaskMetaList
\r
230 def getJoinTask(self):
\r
231 return self._joinTaskObj
\r
234 class ParallelJoinTask(BaseWorkflowTask):
\r
235 def __init__(self, seqNumber, joinOnList):
\r
236 super(ParallelJoinTask, self).__init__(
\r
237 "paralleljoin_" + str(seqNumber))
\r
238 self._type = "JOIN"
\r
239 self._args['joinOn'] = joinOnList
\r
242 def getRandString(length):
\r
243 return "".join(random.choice(str("0123456789")) for i in range(length))
\r