19f0896c42bcca819bcff0ebff9934db63c83728
[bottlenecks.git] / testing-scheduler / server / src / conductor_processor / workflow.py
1 ##############################################################################\r
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
3 #\r
4 # All rights reserved. This program and the accompanying materials\r
5 # are made available under the terms of the Apache License, Version 2.0\r
6 # which accompanies this distribution, and is available at\r
7 # http://www.apache.org/licenses/LICENSE-2.0\r
8 ##############################################################################\r
9 \r
10 import random\r
11 import collections\r
12 import re\r
13 from src.conductor_processor.task import TaskFile\r
14 \r
15 \r
16 class WorkflowFile(object):\r
17     def __init__(self, name):\r
18         self._name = "workflow_" + name + "(%s)" % getRandString(10)\r
19         self._description = ''\r
20         self._version = 1\r
21         self._schemaVersion = 2\r
22         self._tasks = []\r
23         self._outputParameters = {}\r
24 \r
25     def getDict(self):\r
26         d = collections.OrderedDict()\r
27         d['name'] = self._name\r
28         d['description'] = self._description\r
29         d['version'] = self._version\r
30         d['schemaVersion'] = self._schemaVersion\r
31         d['tasks'] = self._tasks\r
32         d['outputParameters'] = self._outputParameters\r
33 \r
34         return d\r
35 \r
36     def generateMetaData(self, flowList, stepObjArr):\r
37         flowParser = FlowParser(flowList, stepObjArr)\r
38         self._tasks, taskMetaList = flowParser.parseMainFlow()\r
39         normalTasks = flowParser.getNormalTaskList()\r
40         for normalTask in normalTasks:\r
41             taskName = normalTask['name']\r
42             referenceName = normalTask['taskReferenceName']\r
43             self._outputParameters["%s(%s)" % (taskName, referenceName)] = \\r
44                 "${%s.output.response.body}" % referenceName\r
45         return self.getDict(), taskMetaList\r
46 \r
47 \r
48 class FlowParser(object):\r
49     def __init__(self, flowList, stepObjArr):\r
50         self._mainFlow = {}\r
51         self._subFlowDict = {}\r
52         self._stepObjArr = stepObjArr\r
53         self._normalTasks = []\r
54         for flow in flowList:\r
55             if flow['name'] == "main":\r
56                 self._mainFlow = flow\r
57             else:\r
58                 self._subFlowDict[flow['name']] = flow\r
59 \r
60     def parseMainFlow(self):\r
61         return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)\r
62 \r
63     def parse(self, obj, stepObjArr):\r
64         if isinstance(obj, str):\r
65             return self.parseFlow(obj, stepObjArr)\r
66         else:\r
67             return self.parseOrderList(obj, stepObjArr)\r
68 \r
69     def parseFlow(self, flowName, stepObjArr):\r
70         orderList = self._subFlowDict[flowName]['orders']\r
71         return self.parseOrderList(orderList, stepObjArr)\r
72 \r
73     def parseOrderList(self, orderList, stepObjArr):\r
74         tasks = []\r
75         taskMetaAllList = []\r
76         for order in orderList:\r
77             if order['type'] == "normal":\r
78                 genTask = NormalTask(order, stepObjArr, self)\r
79                 self._normalTasks.append(genTask)\r
80             elif order['type'] == "switch":\r
81                 genTask = SwitchTask(order, stepObjArr, self)\r
82             elif order['type'] == "parallel":\r
83                 genTask = ParallelTask(order, stepObjArr, self)\r
84             tasks.append(genTask.getDict())\r
85 \r
86             if order['type'] == "parallel":\r
87                 joinTask = genTask.getJoinTask()\r
88                 tasks.append(joinTask.getDict())\r
89 \r
90             taskMetaList = genTask.getTaskMetaList()\r
91             if taskMetaList is not None:\r
92                 taskMetaAllList.extend(taskMetaList)\r
93         return tasks, taskMetaAllList\r
94 \r
95     def getNormalTaskList(self):\r
96         normalTasksDict = []\r
97         for normalTask in self._normalTasks:\r
98             normalTasksDict.append(normalTask.getDict())\r
99         return normalTasksDict\r
100 \r
101     def getNormalTask(self, stepId):\r
102         for normalTask in self._normalTasks:\r
103             if normalTask.getStepId() == stepId:\r
104                 return normalTask\r
105         return None\r
106 \r
107 \r
108 class BaseWorkflowTask(object):\r
109     def __init__(self, name):\r
110         self._name = name\r
111         self._taskReferenceName = self._name + "_task_%s" % getRandString(10)\r
112         self._type = ''\r
113         self._args = {}\r
114 \r
115     def __str__(self):\r
116         dictObj = self.getDict()\r
117         return str(dictObj)\r
118 \r
119     def getDict(self):\r
120         d1 = {\r
121             "name": self._name,\r
122             "taskReferenceName": self._taskReferenceName,\r
123             "type": self._type\r
124         }\r
125         return dict(d1, **self._args)\r
126 \r
127     def getName(self):\r
128         return self._name\r
129 \r
130     def getReferenceName(self):\r
131         return self._taskReferenceName\r
132 \r
133     def getTaskMetaList(self):\r
134         taskFile = TaskFile()\r
135         return [taskFile.generateFromStep(self)]\r
136 \r
137 \r
138 class NormalTask(BaseWorkflowTask):\r
139     def __init__(self, order, stepObjArr, flowParser):\r
140         relatedStepObj = stepObjArr[order['step'] - 1]\r
141         super(NormalTask, self).__init__(relatedStepObj.getName())\r
142         self._taskReferenceName = "task_%s" % getRandString(10)\r
143         self._stepId = relatedStepObj.getId()\r
144         self._type = "HTTP"\r
145         self._args['inputParameters'] = relatedStepObj.getArgs()\r
146         self._paramTransform(self._args['inputParameters'], flowParser)\r
147         print "NormalTask:----------------------\n", relatedStepObj.getArgs()\r
148 \r
149     def _paramTransform(self, argsDict, flowParser):\r
150         for (k, v) in argsDict.items():\r
151             if isinstance(v, str):\r
152                 if re.match("^\(\(\d+\..*\)\)", v):\r
153                     v = v[2:-2]\r
154                     stepId, outputParam = v.split(".")\r
155                     stepId = int(stepId)\r
156                     normalTask = flowParser.getNormalTask(stepId)\r
157                     if normalTask is None:\r
158                         continue\r
159                     argsDict[k] = "${%s.output.response.body.%s}" %  \\r
160                         (normalTask.getReferenceName(), outputParam)\r
161             elif isinstance(v, dict):\r
162                 self._paramTransform(v, flowParser)\r
163 \r
164     def getStepId(self):\r
165         return self._stepId\r
166 \r
167 \r
168 class SwitchTask(BaseWorkflowTask):\r
169     seqNumber = 0\r
170 \r
171     def __init__(self, order, stepObjArr, flowParser):\r
172         super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))\r
173         SwitchTask.seqNumber = SwitchTask.seqNumber + 1\r
174         if 'name' in order:\r
175             self._name = order['name']\r
176         self._type = "DECISION"\r
177         caseValueParam = 'value'\r
178         order['value'] = order['value'][2:-2]\r
179         stepId, outputParam = order['value'].split(".")\r
180         stepId = int(stepId)\r
181         normalTask = flowParser.getNormalTask(stepId)\r
182         caseValue = "${%s.output.response.body.%s}" % \\r
183             (normalTask.getReferenceName(), outputParam)\r
184         self._args['inputParameters'] = {caseValueParam: caseValue}\r
185         self._args['caseValueParam'] = caseValueParam\r
186         self._args['decisionCases'] = {}\r
187         self._childTaskMetaList = []\r
188         for case, caseOrders in order['cases'].items():\r
189             self._args['decisionCases'][case], taskMetaList =  \\r
190                 flowParser.parse(caseOrders, stepObjArr)\r
191             if taskMetaList is not None:\r
192                 self._childTaskMetaList.extend(taskMetaList)\r
193 \r
194     def getTaskMetaList(self):\r
195         selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()\r
196         selfTaskMetaList.extend(self._childTaskMetaList)\r
197         return selfTaskMetaList\r
198 \r
199 \r
200 class ParallelTask(BaseWorkflowTask):\r
201     seqNumber = 0\r
202 \r
203     def __init__(self, order, stepObjArr, flowParser):\r
204         InstSeqNumber = ParallelTask.seqNumber\r
205         super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))\r
206         ParallelTask.seqNumber = ParallelTask.seqNumber + 1\r
207         if 'name' in order:\r
208             self._name = order['name']\r
209         self._type = "FORK_JOIN"\r
210         self._args['forkTasks'] = []\r
211         self._childTaskMetaList = []\r
212         lastTasksNameList = []\r
213         parallelList = order['parallel'].items()\r
214         parallelList.sort()\r
215         for key, orderList in parallelList:\r
216             print orderList\r
217             taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)\r
218             self._args['forkTasks'].append(taskList)\r
219             lastTasksNameList.append(taskList[-1]['taskReferenceName'])\r
220             if taskMetaList is not None:\r
221                 self._childTaskMetaList.extend(taskMetaList)\r
222         self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)\r
223 \r
224     def getTaskMetaList(self):\r
225         selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()\r
226         selfTaskMetaList.extend(self._childTaskMetaList)\r
227         selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())\r
228         return selfTaskMetaList\r
229 \r
230     def getJoinTask(self):\r
231         return self._joinTaskObj\r
232 \r
233 \r
234 class ParallelJoinTask(BaseWorkflowTask):\r
235     def __init__(self, seqNumber, joinOnList):\r
236         super(ParallelJoinTask, self).__init__(\r
237             "paralleljoin_" + str(seqNumber))\r
238         self._type = "JOIN"\r
239         self._args['joinOn'] = joinOnList\r
240 \r
241 \r
242 def getRandString(length):\r
243     return "".join(random.choice(str("0123456789")) for i in range(length))\r