function of generating workflow config file in server part of testing-scheduler 05/60005/4
authorLeoQi <QibinZheng2014@tongji.edu.cn>
Thu, 19 Jul 2018 18:36:15 +0000 (02:36 +0800)
committerLeoQi <QibinZheng2014@tongji.edu.cn>
Thu, 30 Aug 2018 10:49:51 +0000 (18:49 +0800)
JIRA BOTTLENECK-233

A module to help generating the config files which are sent to conductor server.
Based on these config files, conductor can start a workflow.

Change-Id: Ibdd71be6454fe2bd2b8718da0c300827897959e0
Signed-off-by: Zheng Qibin <QibinZheng2014@tongji.edu.cn>
testing-scheduler/server/src/conductor_processor/__init__.py [new file with mode: 0644]
testing-scheduler/server/src/conductor_processor/defaultTaskFile.json [new file with mode: 0644]
testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json [new file with mode: 0644]
testing-scheduler/server/src/conductor_processor/task.py [new file with mode: 0644]
testing-scheduler/server/src/conductor_processor/workflow.py [new file with mode: 0644]

diff --git a/testing-scheduler/server/src/conductor_processor/__init__.py b/testing-scheduler/server/src/conductor_processor/__init__.py
new file mode 100644 (file)
index 0000000..bb02be1
--- /dev/null
@@ -0,0 +1,8 @@
+##############################################################################\r
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
+#\r
+# All rights reserved. This program and the accompanying materials\r
+# are made available under the terms of the Apache License, Version 2.0\r
+# which accompanies this distribution, and is available at\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+##############################################################################\r
diff --git a/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json b/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json
new file mode 100644 (file)
index 0000000..a98a581
--- /dev/null
@@ -0,0 +1,9 @@
+{\r
+    "name": "",\r
+    "retryCount": 6,\r
+    "timeOutSeconds": 1200,\r
+    "timeOutPolicy": "TIME_OUT_WF",\r
+    "retryLogic": "FIXED",\r
+    "retryDelaySeconds": 3,\r
+    "responseTimeOutSeconds": 3600\r
+}
\ No newline at end of file
diff --git a/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json b/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json
new file mode 100644 (file)
index 0000000..8f6251c
--- /dev/null
@@ -0,0 +1,24 @@
+{\r
+    "name": "workflow_demo_05",\r
+    "description": "run a workflow of yardstick test service",\r
+    "version": 1,\r
+    "tasks": [\r
+        {\r
+            "name": "http_yardstick_test",\r
+            "taskReferenceName": "ping_test",\r
+            "inputParameters": {\r
+                "http_request": {\r
+                    "uri": "http://192.168.199.105:8080/greet",\r
+                    "method": "GET"\r
+                }\r
+            },\r
+            "type": "HTTP"\r
+        }\r
+    ],\r
+    "outputParameters": {\r
+        "header": "${ping_test.output.response.headers}",\r
+        "response": "${ping_test.output.response.body}",\r
+        "status": "${ping_test.output.response.statusCode}"\r
+    },\r
+    "schemaVersion": 2\r
+}
\ No newline at end of file
diff --git a/testing-scheduler/server/src/conductor_processor/task.py b/testing-scheduler/server/src/conductor_processor/task.py
new file mode 100644 (file)
index 0000000..6f25aef
--- /dev/null
@@ -0,0 +1,28 @@
+##############################################################################\r
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
+#\r
+# All rights reserved. This program and the accompanying materials\r
+# are made available under the terms of the Apache License, Version 2.0\r
+# which accompanies this distribution, and is available at\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+##############################################################################\r
+\r
+import json\r
+import os\r
+\r
+\r
+class TaskFile(object):\r
+    def __init__(self, taskName='task_0'):\r
+        self._defaultConfFile = self._getFilePath("defaultTaskFile.json")\r
+        with open(self._defaultConfFile) as defaultConf:\r
+            self._jsonObj = json.load(defaultConf)\r
+        self._jsonObj['name'] = taskName\r
+\r
+    def generateFromStep(self, stepObject):\r
+        self._jsonObj['name'] = stepObject.getName()\r
+        print "taskFile:", self._jsonObj['name']\r
+        return self._jsonObj\r
+\r
+    def _getFilePath(self, fileName):\r
+        dirPath = os.path.dirname(os.path.realpath(__file__))\r
+        return os.path.join(dirPath, fileName)\r
diff --git a/testing-scheduler/server/src/conductor_processor/workflow.py b/testing-scheduler/server/src/conductor_processor/workflow.py
new file mode 100644 (file)
index 0000000..19f0896
--- /dev/null
@@ -0,0 +1,243 @@
+##############################################################################\r
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
+#\r
+# All rights reserved. This program and the accompanying materials\r
+# are made available under the terms of the Apache License, Version 2.0\r
+# which accompanies this distribution, and is available at\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+##############################################################################\r
+\r
+import random\r
+import collections\r
+import re\r
+from src.conductor_processor.task import TaskFile\r
+\r
+\r
+class WorkflowFile(object):\r
+    def __init__(self, name):\r
+        self._name = "workflow_" + name + "(%s)" % getRandString(10)\r
+        self._description = ''\r
+        self._version = 1\r
+        self._schemaVersion = 2\r
+        self._tasks = []\r
+        self._outputParameters = {}\r
+\r
+    def getDict(self):\r
+        d = collections.OrderedDict()\r
+        d['name'] = self._name\r
+        d['description'] = self._description\r
+        d['version'] = self._version\r
+        d['schemaVersion'] = self._schemaVersion\r
+        d['tasks'] = self._tasks\r
+        d['outputParameters'] = self._outputParameters\r
+\r
+        return d\r
+\r
+    def generateMetaData(self, flowList, stepObjArr):\r
+        flowParser = FlowParser(flowList, stepObjArr)\r
+        self._tasks, taskMetaList = flowParser.parseMainFlow()\r
+        normalTasks = flowParser.getNormalTaskList()\r
+        for normalTask in normalTasks:\r
+            taskName = normalTask['name']\r
+            referenceName = normalTask['taskReferenceName']\r
+            self._outputParameters["%s(%s)" % (taskName, referenceName)] = \\r
+                "${%s.output.response.body}" % referenceName\r
+        return self.getDict(), taskMetaList\r
+\r
+\r
+class FlowParser(object):\r
+    def __init__(self, flowList, stepObjArr):\r
+        self._mainFlow = {}\r
+        self._subFlowDict = {}\r
+        self._stepObjArr = stepObjArr\r
+        self._normalTasks = []\r
+        for flow in flowList:\r
+            if flow['name'] == "main":\r
+                self._mainFlow = flow\r
+            else:\r
+                self._subFlowDict[flow['name']] = flow\r
+\r
+    def parseMainFlow(self):\r
+        return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)\r
+\r
+    def parse(self, obj, stepObjArr):\r
+        if isinstance(obj, str):\r
+            return self.parseFlow(obj, stepObjArr)\r
+        else:\r
+            return self.parseOrderList(obj, stepObjArr)\r
+\r
+    def parseFlow(self, flowName, stepObjArr):\r
+        orderList = self._subFlowDict[flowName]['orders']\r
+        return self.parseOrderList(orderList, stepObjArr)\r
+\r
+    def parseOrderList(self, orderList, stepObjArr):\r
+        tasks = []\r
+        taskMetaAllList = []\r
+        for order in orderList:\r
+            if order['type'] == "normal":\r
+                genTask = NormalTask(order, stepObjArr, self)\r
+                self._normalTasks.append(genTask)\r
+            elif order['type'] == "switch":\r
+                genTask = SwitchTask(order, stepObjArr, self)\r
+            elif order['type'] == "parallel":\r
+                genTask = ParallelTask(order, stepObjArr, self)\r
+            tasks.append(genTask.getDict())\r
+\r
+            if order['type'] == "parallel":\r
+                joinTask = genTask.getJoinTask()\r
+                tasks.append(joinTask.getDict())\r
+\r
+            taskMetaList = genTask.getTaskMetaList()\r
+            if taskMetaList is not None:\r
+                taskMetaAllList.extend(taskMetaList)\r
+        return tasks, taskMetaAllList\r
+\r
+    def getNormalTaskList(self):\r
+        normalTasksDict = []\r
+        for normalTask in self._normalTasks:\r
+            normalTasksDict.append(normalTask.getDict())\r
+        return normalTasksDict\r
+\r
+    def getNormalTask(self, stepId):\r
+        for normalTask in self._normalTasks:\r
+            if normalTask.getStepId() == stepId:\r
+                return normalTask\r
+        return None\r
+\r
+\r
+class BaseWorkflowTask(object):\r
+    def __init__(self, name):\r
+        self._name = name\r
+        self._taskReferenceName = self._name + "_task_%s" % getRandString(10)\r
+        self._type = ''\r
+        self._args = {}\r
+\r
+    def __str__(self):\r
+        dictObj = self.getDict()\r
+        return str(dictObj)\r
+\r
+    def getDict(self):\r
+        d1 = {\r
+            "name": self._name,\r
+            "taskReferenceName": self._taskReferenceName,\r
+            "type": self._type\r
+        }\r
+        return dict(d1, **self._args)\r
+\r
+    def getName(self):\r
+        return self._name\r
+\r
+    def getReferenceName(self):\r
+        return self._taskReferenceName\r
+\r
+    def getTaskMetaList(self):\r
+        taskFile = TaskFile()\r
+        return [taskFile.generateFromStep(self)]\r
+\r
+\r
+class NormalTask(BaseWorkflowTask):\r
+    def __init__(self, order, stepObjArr, flowParser):\r
+        relatedStepObj = stepObjArr[order['step'] - 1]\r
+        super(NormalTask, self).__init__(relatedStepObj.getName())\r
+        self._taskReferenceName = "task_%s" % getRandString(10)\r
+        self._stepId = relatedStepObj.getId()\r
+        self._type = "HTTP"\r
+        self._args['inputParameters'] = relatedStepObj.getArgs()\r
+        self._paramTransform(self._args['inputParameters'], flowParser)\r
+        print "NormalTask:----------------------\n", relatedStepObj.getArgs()\r
+\r
+    def _paramTransform(self, argsDict, flowParser):\r
+        for (k, v) in argsDict.items():\r
+            if isinstance(v, str):\r
+                if re.match("^\(\(\d+\..*\)\)", v):\r
+                    v = v[2:-2]\r
+                    stepId, outputParam = v.split(".")\r
+                    stepId = int(stepId)\r
+                    normalTask = flowParser.getNormalTask(stepId)\r
+                    if normalTask is None:\r
+                        continue\r
+                    argsDict[k] = "${%s.output.response.body.%s}" %  \\r
+                        (normalTask.getReferenceName(), outputParam)\r
+            elif isinstance(v, dict):\r
+                self._paramTransform(v, flowParser)\r
+\r
+    def getStepId(self):\r
+        return self._stepId\r
+\r
+\r
+class SwitchTask(BaseWorkflowTask):\r
+    seqNumber = 0\r
+\r
+    def __init__(self, order, stepObjArr, flowParser):\r
+        super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))\r
+        SwitchTask.seqNumber = SwitchTask.seqNumber + 1\r
+        if 'name' in order:\r
+            self._name = order['name']\r
+        self._type = "DECISION"\r
+        caseValueParam = 'value'\r
+        order['value'] = order['value'][2:-2]\r
+        stepId, outputParam = order['value'].split(".")\r
+        stepId = int(stepId)\r
+        normalTask = flowParser.getNormalTask(stepId)\r
+        caseValue = "${%s.output.response.body.%s}" % \\r
+            (normalTask.getReferenceName(), outputParam)\r
+        self._args['inputParameters'] = {caseValueParam: caseValue}\r
+        self._args['caseValueParam'] = caseValueParam\r
+        self._args['decisionCases'] = {}\r
+        self._childTaskMetaList = []\r
+        for case, caseOrders in order['cases'].items():\r
+            self._args['decisionCases'][case], taskMetaList =  \\r
+                flowParser.parse(caseOrders, stepObjArr)\r
+            if taskMetaList is not None:\r
+                self._childTaskMetaList.extend(taskMetaList)\r
+\r
+    def getTaskMetaList(self):\r
+        selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()\r
+        selfTaskMetaList.extend(self._childTaskMetaList)\r
+        return selfTaskMetaList\r
+\r
+\r
+class ParallelTask(BaseWorkflowTask):\r
+    seqNumber = 0\r
+\r
+    def __init__(self, order, stepObjArr, flowParser):\r
+        InstSeqNumber = ParallelTask.seqNumber\r
+        super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))\r
+        ParallelTask.seqNumber = ParallelTask.seqNumber + 1\r
+        if 'name' in order:\r
+            self._name = order['name']\r
+        self._type = "FORK_JOIN"\r
+        self._args['forkTasks'] = []\r
+        self._childTaskMetaList = []\r
+        lastTasksNameList = []\r
+        parallelList = order['parallel'].items()\r
+        parallelList.sort()\r
+        for key, orderList in parallelList:\r
+            print orderList\r
+            taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)\r
+            self._args['forkTasks'].append(taskList)\r
+            lastTasksNameList.append(taskList[-1]['taskReferenceName'])\r
+            if taskMetaList is not None:\r
+                self._childTaskMetaList.extend(taskMetaList)\r
+        self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)\r
+\r
+    def getTaskMetaList(self):\r
+        selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()\r
+        selfTaskMetaList.extend(self._childTaskMetaList)\r
+        selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())\r
+        return selfTaskMetaList\r
+\r
+    def getJoinTask(self):\r
+        return self._joinTaskObj\r
+\r
+\r
+class ParallelJoinTask(BaseWorkflowTask):\r
+    def __init__(self, seqNumber, joinOnList):\r
+        super(ParallelJoinTask, self).__init__(\r
+            "paralleljoin_" + str(seqNumber))\r
+        self._type = "JOIN"\r
+        self._args['joinOn'] = joinOnList\r
+\r
+\r
+def getRandString(length):\r
+    return "".join(random.choice(str("0123456789")) for i in range(length))\r