Change naming and veriy test-scheduler function
[bottlenecks.git] / test-scheduler / server / src / conductor_processor / workflow.py
1 ##############################################################################
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import random
11 import collections
12 import re
13 from src.conductor_processor.task import TaskFile
14
15
16 class WorkflowFile(object):
17     def __init__(self, name):
18         self._name = "workflow_" + name + "(%s)" % getRandString(10)
19         self._description = ''
20         self._version = 1
21         self._schemaVersion = 2
22         self._tasks = []
23         self._outputParameters = {}
24
25     def getDict(self):
26         d = collections.OrderedDict()
27         d['name'] = self._name
28         d['description'] = self._description
29         d['version'] = self._version
30         d['schemaVersion'] = self._schemaVersion
31         d['tasks'] = self._tasks
32         d['outputParameters'] = self._outputParameters
33
34         return d
35
36     def generateMetaData(self, flowList, stepObjArr):
37         flowParser = FlowParser(flowList, stepObjArr)
38         self._tasks, taskMetaList = flowParser.parseMainFlow()
39         normalTasks = flowParser.getNormalTaskList()
40         for normalTask in normalTasks:
41             taskName = normalTask['name']
42             referenceName = normalTask['taskReferenceName']
43             self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
44                 "${%s.output.response.body}" % referenceName
45         return self.getDict(), taskMetaList
46
47
48 class FlowParser(object):
49     def __init__(self, flowList, stepObjArr):
50         self._mainFlow = {}
51         self._subFlowDict = {}
52         self._stepObjArr = stepObjArr
53         self._normalTasks = []
54         for flow in flowList:
55             if flow['name'] == "main":
56                 self._mainFlow = flow
57             else:
58                 self._subFlowDict[flow['name']] = flow
59
60     def parseMainFlow(self):
61         return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
62
63     def parse(self, obj, stepObjArr):
64         if isinstance(obj, str):
65             return self.parseFlow(obj, stepObjArr)
66         else:
67             return self.parseOrderList(obj, stepObjArr)
68
69     def parseFlow(self, flowName, stepObjArr):
70         orderList = self._subFlowDict[flowName]['orders']
71         return self.parseOrderList(orderList, stepObjArr)
72
73     def parseOrderList(self, orderList, stepObjArr):
74         tasks = []
75         taskMetaAllList = []
76         for order in orderList:
77             if order['type'] == "normal":
78                 genTask = NormalTask(order, stepObjArr, self)
79                 self._normalTasks.append(genTask)
80             elif order['type'] == "switch":
81                 genTask = SwitchTask(order, stepObjArr, self)
82             elif order['type'] == "parallel":
83                 genTask = ParallelTask(order, stepObjArr, self)
84             tasks.append(genTask.getDict())
85
86             if order['type'] == "parallel":
87                 joinTask = genTask.getJoinTask()
88                 tasks.append(joinTask.getDict())
89
90             taskMetaList = genTask.getTaskMetaList()
91             if taskMetaList is not None:
92                 taskMetaAllList.extend(taskMetaList)
93         return tasks, taskMetaAllList
94
95     def getNormalTaskList(self):
96         normalTasksDict = []
97         for normalTask in self._normalTasks:
98             normalTasksDict.append(normalTask.getDict())
99         return normalTasksDict
100
101     def getNormalTask(self, stepId):
102         for normalTask in self._normalTasks:
103             if normalTask.getStepId() == stepId:
104                 return normalTask
105         return None
106
107
108 class BaseWorkflowTask(object):
109     def __init__(self, name):
110         self._name = name
111         self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
112         self._type = ''
113         self._args = {}
114
115     def __str__(self):
116         dictObj = self.getDict()
117         return str(dictObj)
118
119     def getDict(self):
120         d1 = {
121             "name": self._name,
122             "taskReferenceName": self._taskReferenceName,
123             "type": self._type
124         }
125         return dict(d1, **self._args)
126
127     def getName(self):
128         return self._name
129
130     def getReferenceName(self):
131         return self._taskReferenceName
132
133     def getTaskMetaList(self):
134         taskFile = TaskFile()
135         return [taskFile.generateFromStep(self)]
136
137
138 class NormalTask(BaseWorkflowTask):
139     def __init__(self, order, stepObjArr, flowParser):
140         relatedStepObj = stepObjArr[order['step'] - 1]
141         super(NormalTask, self).__init__(relatedStepObj.getName())
142         self._taskReferenceName = "task_%s" % getRandString(10)
143         self._stepId = relatedStepObj.getId()
144         self._type = "HTTP"
145         self._args['inputParameters'] = relatedStepObj.getArgs()
146         self._paramTransform(self._args['inputParameters'], flowParser)
147         print "NormalTask:----------------------\n", relatedStepObj.getArgs()
148
149     def _paramTransform(self, argsDict, flowParser):
150         for (k, v) in argsDict.items():
151             if isinstance(v, str):
152                 if re.match("^\(\(\d+\..*\)\)", v):
153                     v = v[2:-2]
154                     stepId, outputParam = v.split(".")
155                     stepId = int(stepId)
156                     normalTask = flowParser.getNormalTask(stepId)
157                     if normalTask is None:
158                         continue
159                     argsDict[k] = "${%s.output.response.body.%s}" %  \
160                         (normalTask.getReferenceName(), outputParam)
161             elif isinstance(v, dict):
162                 self._paramTransform(v, flowParser)
163
164     def getStepId(self):
165         return self._stepId
166
167
168 class SwitchTask(BaseWorkflowTask):
169     seqNumber = 0
170
171     def __init__(self, order, stepObjArr, flowParser):
172         super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
173         SwitchTask.seqNumber = SwitchTask.seqNumber + 1
174         if 'name' in order:
175             self._name = order['name']
176         self._type = "DECISION"
177         caseValueParam = 'value'
178         order['value'] = order['value'][2:-2]
179         stepId, outputParam = order['value'].split(".")
180         stepId = int(stepId)
181         normalTask = flowParser.getNormalTask(stepId)
182         caseValue = "${%s.output.response.body.%s}" % \
183             (normalTask.getReferenceName(), outputParam)
184         self._args['inputParameters'] = {caseValueParam: caseValue}
185         self._args['caseValueParam'] = caseValueParam
186         self._args['decisionCases'] = {}
187         self._childTaskMetaList = []
188         for case, caseOrders in order['cases'].items():
189             self._args['decisionCases'][case], taskMetaList =  \
190                 flowParser.parse(caseOrders, stepObjArr)
191             if taskMetaList is not None:
192                 self._childTaskMetaList.extend(taskMetaList)
193
194     def getTaskMetaList(self):
195         selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
196         selfTaskMetaList.extend(self._childTaskMetaList)
197         return selfTaskMetaList
198
199
200 class ParallelTask(BaseWorkflowTask):
201     seqNumber = 0
202
203     def __init__(self, order, stepObjArr, flowParser):
204         InstSeqNumber = ParallelTask.seqNumber
205         super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
206         ParallelTask.seqNumber = ParallelTask.seqNumber + 1
207         if 'name' in order:
208             self._name = order['name']
209         self._type = "FORK_JOIN"
210         self._args['forkTasks'] = []
211         self._childTaskMetaList = []
212         lastTasksNameList = []
213         parallelList = order['parallel'].items()
214         parallelList.sort()
215         for key, orderList in parallelList:
216             print orderList
217             taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
218             self._args['forkTasks'].append(taskList)
219             lastTasksNameList.append(taskList[-1]['taskReferenceName'])
220             if taskMetaList is not None:
221                 self._childTaskMetaList.extend(taskMetaList)
222         self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
223
224     def getTaskMetaList(self):
225         selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
226         selfTaskMetaList.extend(self._childTaskMetaList)
227         selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
228         return selfTaskMetaList
229
230     def getJoinTask(self):
231         return self._joinTaskObj
232
233
234 class ParallelJoinTask(BaseWorkflowTask):
235     def __init__(self, seqNumber, joinOnList):
236         super(ParallelJoinTask, self).__init__(
237             "paralleljoin_" + str(seqNumber))
238         self._type = "JOIN"
239         self._args['joinOn'] = joinOnList
240
241
242 def getRandString(length):
243     return "".join(random.choice(str("0123456789")) for i in range(length))