Change naming and veriy test-scheduler function
[bottlenecks.git] / test-scheduler / server / src / conductor_processor / workflow.py
-##############################################################################\r
-# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
-#\r
-# All rights reserved. This program and the accompanying materials\r
-# are made available under the terms of the Apache License, Version 2.0\r
-# which accompanies this distribution, and is available at\r
-# http://www.apache.org/licenses/LICENSE-2.0\r
-##############################################################################\r
-\r
-import random\r
-import collections\r
-import re\r
-from src.conductor_processor.task import TaskFile\r
-\r
-\r
-class WorkflowFile(object):\r
-    def __init__(self, name):\r
-        self._name = "workflow_" + name + "(%s)" % getRandString(10)\r
-        self._description = ''\r
-        self._version = 1\r
-        self._schemaVersion = 2\r
-        self._tasks = []\r
-        self._outputParameters = {}\r
-\r
-    def getDict(self):\r
-        d = collections.OrderedDict()\r
-        d['name'] = self._name\r
-        d['description'] = self._description\r
-        d['version'] = self._version\r
-        d['schemaVersion'] = self._schemaVersion\r
-        d['tasks'] = self._tasks\r
-        d['outputParameters'] = self._outputParameters\r
-\r
-        return d\r
-\r
-    def generateMetaData(self, flowList, stepObjArr):\r
-        flowParser = FlowParser(flowList, stepObjArr)\r
-        self._tasks, taskMetaList = flowParser.parseMainFlow()\r
-        normalTasks = flowParser.getNormalTaskList()\r
-        for normalTask in normalTasks:\r
-            taskName = normalTask['name']\r
-            referenceName = normalTask['taskReferenceName']\r
-            self._outputParameters["%s(%s)" % (taskName, referenceName)] = \\r
-                "${%s.output.response.body}" % referenceName\r
-        return self.getDict(), taskMetaList\r
-\r
-\r
-class FlowParser(object):\r
-    def __init__(self, flowList, stepObjArr):\r
-        self._mainFlow = {}\r
-        self._subFlowDict = {}\r
-        self._stepObjArr = stepObjArr\r
-        self._normalTasks = []\r
-        for flow in flowList:\r
-            if flow['name'] == "main":\r
-                self._mainFlow = flow\r
-            else:\r
-                self._subFlowDict[flow['name']] = flow\r
-\r
-    def parseMainFlow(self):\r
-        return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)\r
-\r
-    def parse(self, obj, stepObjArr):\r
-        if isinstance(obj, str):\r
-            return self.parseFlow(obj, stepObjArr)\r
-        else:\r
-            return self.parseOrderList(obj, stepObjArr)\r
-\r
-    def parseFlow(self, flowName, stepObjArr):\r
-        orderList = self._subFlowDict[flowName]['orders']\r
-        return self.parseOrderList(orderList, stepObjArr)\r
-\r
-    def parseOrderList(self, orderList, stepObjArr):\r
-        tasks = []\r
-        taskMetaAllList = []\r
-        for order in orderList:\r
-            if order['type'] == "normal":\r
-                genTask = NormalTask(order, stepObjArr, self)\r
-                self._normalTasks.append(genTask)\r
-            elif order['type'] == "switch":\r
-                genTask = SwitchTask(order, stepObjArr, self)\r
-            elif order['type'] == "parallel":\r
-                genTask = ParallelTask(order, stepObjArr, self)\r
-            tasks.append(genTask.getDict())\r
-\r
-            if order['type'] == "parallel":\r
-                joinTask = genTask.getJoinTask()\r
-                tasks.append(joinTask.getDict())\r
-\r
-            taskMetaList = genTask.getTaskMetaList()\r
-            if taskMetaList is not None:\r
-                taskMetaAllList.extend(taskMetaList)\r
-        return tasks, taskMetaAllList\r
-\r
-    def getNormalTaskList(self):\r
-        normalTasksDict = []\r
-        for normalTask in self._normalTasks:\r
-            normalTasksDict.append(normalTask.getDict())\r
-        return normalTasksDict\r
-\r
-    def getNormalTask(self, stepId):\r
-        for normalTask in self._normalTasks:\r
-            if normalTask.getStepId() == stepId:\r
-                return normalTask\r
-        return None\r
-\r
-\r
-class BaseWorkflowTask(object):\r
-    def __init__(self, name):\r
-        self._name = name\r
-        self._taskReferenceName = self._name + "_task_%s" % getRandString(10)\r
-        self._type = ''\r
-        self._args = {}\r
-\r
-    def __str__(self):\r
-        dictObj = self.getDict()\r
-        return str(dictObj)\r
-\r
-    def getDict(self):\r
-        d1 = {\r
-            "name": self._name,\r
-            "taskReferenceName": self._taskReferenceName,\r
-            "type": self._type\r
-        }\r
-        return dict(d1, **self._args)\r
-\r
-    def getName(self):\r
-        return self._name\r
-\r
-    def getReferenceName(self):\r
-        return self._taskReferenceName\r
-\r
-    def getTaskMetaList(self):\r
-        taskFile = TaskFile()\r
-        return [taskFile.generateFromStep(self)]\r
-\r
-\r
-class NormalTask(BaseWorkflowTask):\r
-    def __init__(self, order, stepObjArr, flowParser):\r
-        relatedStepObj = stepObjArr[order['step'] - 1]\r
-        super(NormalTask, self).__init__(relatedStepObj.getName())\r
-        self._taskReferenceName = "task_%s" % getRandString(10)\r
-        self._stepId = relatedStepObj.getId()\r
-        self._type = "HTTP"\r
-        self._args['inputParameters'] = relatedStepObj.getArgs()\r
-        self._paramTransform(self._args['inputParameters'], flowParser)\r
-        print "NormalTask:----------------------\n", relatedStepObj.getArgs()\r
-\r
-    def _paramTransform(self, argsDict, flowParser):\r
-        for (k, v) in argsDict.items():\r
-            if isinstance(v, str):\r
-                if re.match("^\(\(\d+\..*\)\)", v):\r
-                    v = v[2:-2]\r
-                    stepId, outputParam = v.split(".")\r
-                    stepId = int(stepId)\r
-                    normalTask = flowParser.getNormalTask(stepId)\r
-                    if normalTask is None:\r
-                        continue\r
-                    argsDict[k] = "${%s.output.response.body.%s}" %  \\r
-                        (normalTask.getReferenceName(), outputParam)\r
-            elif isinstance(v, dict):\r
-                self._paramTransform(v, flowParser)\r
-\r
-    def getStepId(self):\r
-        return self._stepId\r
-\r
-\r
-class SwitchTask(BaseWorkflowTask):\r
-    seqNumber = 0\r
-\r
-    def __init__(self, order, stepObjArr, flowParser):\r
-        super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))\r
-        SwitchTask.seqNumber = SwitchTask.seqNumber + 1\r
-        if 'name' in order:\r
-            self._name = order['name']\r
-        self._type = "DECISION"\r
-        caseValueParam = 'value'\r
-        order['value'] = order['value'][2:-2]\r
-        stepId, outputParam = order['value'].split(".")\r
-        stepId = int(stepId)\r
-        normalTask = flowParser.getNormalTask(stepId)\r
-        caseValue = "${%s.output.response.body.%s}" % \\r
-            (normalTask.getReferenceName(), outputParam)\r
-        self._args['inputParameters'] = {caseValueParam: caseValue}\r
-        self._args['caseValueParam'] = caseValueParam\r
-        self._args['decisionCases'] = {}\r
-        self._childTaskMetaList = []\r
-        for case, caseOrders in order['cases'].items():\r
-            self._args['decisionCases'][case], taskMetaList =  \\r
-                flowParser.parse(caseOrders, stepObjArr)\r
-            if taskMetaList is not None:\r
-                self._childTaskMetaList.extend(taskMetaList)\r
-\r
-    def getTaskMetaList(self):\r
-        selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()\r
-        selfTaskMetaList.extend(self._childTaskMetaList)\r
-        return selfTaskMetaList\r
-\r
-\r
-class ParallelTask(BaseWorkflowTask):\r
-    seqNumber = 0\r
-\r
-    def __init__(self, order, stepObjArr, flowParser):\r
-        InstSeqNumber = ParallelTask.seqNumber\r
-        super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))\r
-        ParallelTask.seqNumber = ParallelTask.seqNumber + 1\r
-        if 'name' in order:\r
-            self._name = order['name']\r
-        self._type = "FORK_JOIN"\r
-        self._args['forkTasks'] = []\r
-        self._childTaskMetaList = []\r
-        lastTasksNameList = []\r
-        parallelList = order['parallel'].items()\r
-        parallelList.sort()\r
-        for key, orderList in parallelList:\r
-            print orderList\r
-            taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)\r
-            self._args['forkTasks'].append(taskList)\r
-            lastTasksNameList.append(taskList[-1]['taskReferenceName'])\r
-            if taskMetaList is not None:\r
-                self._childTaskMetaList.extend(taskMetaList)\r
-        self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)\r
-\r
-    def getTaskMetaList(self):\r
-        selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()\r
-        selfTaskMetaList.extend(self._childTaskMetaList)\r
-        selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())\r
-        return selfTaskMetaList\r
-\r
-    def getJoinTask(self):\r
-        return self._joinTaskObj\r
-\r
-\r
-class ParallelJoinTask(BaseWorkflowTask):\r
-    def __init__(self, seqNumber, joinOnList):\r
-        super(ParallelJoinTask, self).__init__(\r
-            "paralleljoin_" + str(seqNumber))\r
-        self._type = "JOIN"\r
-        self._args['joinOn'] = joinOnList\r
-\r
-\r
-def getRandString(length):\r
-    return "".join(random.choice(str("0123456789")) for i in range(length))\r
+##############################################################################
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import random
+import collections
+import re
+from src.conductor_processor.task import TaskFile
+
+
+class WorkflowFile(object):
+    def __init__(self, name):
+        self._name = "workflow_" + name + "(%s)" % getRandString(10)
+        self._description = ''
+        self._version = 1
+        self._schemaVersion = 2
+        self._tasks = []
+        self._outputParameters = {}
+
+    def getDict(self):
+        d = collections.OrderedDict()
+        d['name'] = self._name
+        d['description'] = self._description
+        d['version'] = self._version
+        d['schemaVersion'] = self._schemaVersion
+        d['tasks'] = self._tasks
+        d['outputParameters'] = self._outputParameters
+
+        return d
+
+    def generateMetaData(self, flowList, stepObjArr):
+        flowParser = FlowParser(flowList, stepObjArr)
+        self._tasks, taskMetaList = flowParser.parseMainFlow()
+        normalTasks = flowParser.getNormalTaskList()
+        for normalTask in normalTasks:
+            taskName = normalTask['name']
+            referenceName = normalTask['taskReferenceName']
+            self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
+                "${%s.output.response.body}" % referenceName
+        return self.getDict(), taskMetaList
+
+
+class FlowParser(object):
+    def __init__(self, flowList, stepObjArr):
+        self._mainFlow = {}
+        self._subFlowDict = {}
+        self._stepObjArr = stepObjArr
+        self._normalTasks = []
+        for flow in flowList:
+            if flow['name'] == "main":
+                self._mainFlow = flow
+            else:
+                self._subFlowDict[flow['name']] = flow
+
+    def parseMainFlow(self):
+        return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
+
+    def parse(self, obj, stepObjArr):
+        if isinstance(obj, str):
+            return self.parseFlow(obj, stepObjArr)
+        else:
+            return self.parseOrderList(obj, stepObjArr)
+
+    def parseFlow(self, flowName, stepObjArr):
+        orderList = self._subFlowDict[flowName]['orders']
+        return self.parseOrderList(orderList, stepObjArr)
+
+    def parseOrderList(self, orderList, stepObjArr):
+        tasks = []
+        taskMetaAllList = []
+        for order in orderList:
+            if order['type'] == "normal":
+                genTask = NormalTask(order, stepObjArr, self)
+                self._normalTasks.append(genTask)
+            elif order['type'] == "switch":
+                genTask = SwitchTask(order, stepObjArr, self)
+            elif order['type'] == "parallel":
+                genTask = ParallelTask(order, stepObjArr, self)
+            tasks.append(genTask.getDict())
+
+            if order['type'] == "parallel":
+                joinTask = genTask.getJoinTask()
+                tasks.append(joinTask.getDict())
+
+            taskMetaList = genTask.getTaskMetaList()
+            if taskMetaList is not None:
+                taskMetaAllList.extend(taskMetaList)
+        return tasks, taskMetaAllList
+
+    def getNormalTaskList(self):
+        normalTasksDict = []
+        for normalTask in self._normalTasks:
+            normalTasksDict.append(normalTask.getDict())
+        return normalTasksDict
+
+    def getNormalTask(self, stepId):
+        for normalTask in self._normalTasks:
+            if normalTask.getStepId() == stepId:
+                return normalTask
+        return None
+
+
+class BaseWorkflowTask(object):
+    def __init__(self, name):
+        self._name = name
+        self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
+        self._type = ''
+        self._args = {}
+
+    def __str__(self):
+        dictObj = self.getDict()
+        return str(dictObj)
+
+    def getDict(self):
+        d1 = {
+            "name": self._name,
+            "taskReferenceName": self._taskReferenceName,
+            "type": self._type
+        }
+        return dict(d1, **self._args)
+
+    def getName(self):
+        return self._name
+
+    def getReferenceName(self):
+        return self._taskReferenceName
+
+    def getTaskMetaList(self):
+        taskFile = TaskFile()
+        return [taskFile.generateFromStep(self)]
+
+
+class NormalTask(BaseWorkflowTask):
+    def __init__(self, order, stepObjArr, flowParser):
+        relatedStepObj = stepObjArr[order['step'] - 1]
+        super(NormalTask, self).__init__(relatedStepObj.getName())
+        self._taskReferenceName = "task_%s" % getRandString(10)
+        self._stepId = relatedStepObj.getId()
+        self._type = "HTTP"
+        self._args['inputParameters'] = relatedStepObj.getArgs()
+        self._paramTransform(self._args['inputParameters'], flowParser)
+        print "NormalTask:----------------------\n", relatedStepObj.getArgs()
+
+    def _paramTransform(self, argsDict, flowParser):
+        for (k, v) in argsDict.items():
+            if isinstance(v, str):
+                if re.match("^\(\(\d+\..*\)\)", v):
+                    v = v[2:-2]
+                    stepId, outputParam = v.split(".")
+                    stepId = int(stepId)
+                    normalTask = flowParser.getNormalTask(stepId)
+                    if normalTask is None:
+                        continue
+                    argsDict[k] = "${%s.output.response.body.%s}" %  \
+                        (normalTask.getReferenceName(), outputParam)
+            elif isinstance(v, dict):
+                self._paramTransform(v, flowParser)
+
+    def getStepId(self):
+        return self._stepId
+
+
+class SwitchTask(BaseWorkflowTask):
+    seqNumber = 0
+
+    def __init__(self, order, stepObjArr, flowParser):
+        super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
+        SwitchTask.seqNumber = SwitchTask.seqNumber + 1
+        if 'name' in order:
+            self._name = order['name']
+        self._type = "DECISION"
+        caseValueParam = 'value'
+        order['value'] = order['value'][2:-2]
+        stepId, outputParam = order['value'].split(".")
+        stepId = int(stepId)
+        normalTask = flowParser.getNormalTask(stepId)
+        caseValue = "${%s.output.response.body.%s}" % \
+            (normalTask.getReferenceName(), outputParam)
+        self._args['inputParameters'] = {caseValueParam: caseValue}
+        self._args['caseValueParam'] = caseValueParam
+        self._args['decisionCases'] = {}
+        self._childTaskMetaList = []
+        for case, caseOrders in order['cases'].items():
+            self._args['decisionCases'][case], taskMetaList =  \
+                flowParser.parse(caseOrders, stepObjArr)
+            if taskMetaList is not None:
+                self._childTaskMetaList.extend(taskMetaList)
+
+    def getTaskMetaList(self):
+        selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
+        selfTaskMetaList.extend(self._childTaskMetaList)
+        return selfTaskMetaList
+
+
+class ParallelTask(BaseWorkflowTask):
+    seqNumber = 0
+
+    def __init__(self, order, stepObjArr, flowParser):
+        InstSeqNumber = ParallelTask.seqNumber
+        super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
+        ParallelTask.seqNumber = ParallelTask.seqNumber + 1
+        if 'name' in order:
+            self._name = order['name']
+        self._type = "FORK_JOIN"
+        self._args['forkTasks'] = []
+        self._childTaskMetaList = []
+        lastTasksNameList = []
+        parallelList = order['parallel'].items()
+        parallelList.sort()
+        for key, orderList in parallelList:
+            print orderList
+            taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
+            self._args['forkTasks'].append(taskList)
+            lastTasksNameList.append(taskList[-1]['taskReferenceName'])
+            if taskMetaList is not None:
+                self._childTaskMetaList.extend(taskMetaList)
+        self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
+
+    def getTaskMetaList(self):
+        selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
+        selfTaskMetaList.extend(self._childTaskMetaList)
+        selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
+        return selfTaskMetaList
+
+    def getJoinTask(self):
+        return self._joinTaskObj
+
+
+class ParallelJoinTask(BaseWorkflowTask):
+    def __init__(self, seqNumber, joinOnList):
+        super(ParallelJoinTask, self).__init__(
+            "paralleljoin_" + str(seqNumber))
+        self._type = "JOIN"
+        self._args['joinOn'] = joinOnList
+
+
+def getRandString(length):
+    return "".join(random.choice(str("0123456789")) for i in range(length))