+##############################################################################\r
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
+#\r
+# All rights reserved. This program and the accompanying materials\r
+# are made available under the terms of the Apache License, Version 2.0\r
+# which accompanies this distribution, and is available at\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+##############################################################################\r
+\r
+import random\r
+import collections\r
+import re\r
+from src.conductor_processor.task import TaskFile\r
+\r
+\r
+class WorkflowFile(object):\r
+ def __init__(self, name):\r
+ self._name = "workflow_" + name + "(%s)" % getRandString(10)\r
+ self._description = ''\r
+ self._version = 1\r
+ self._schemaVersion = 2\r
+ self._tasks = []\r
+ self._outputParameters = {}\r
+\r
+ def getDict(self):\r
+ d = collections.OrderedDict()\r
+ d['name'] = self._name\r
+ d['description'] = self._description\r
+ d['version'] = self._version\r
+ d['schemaVersion'] = self._schemaVersion\r
+ d['tasks'] = self._tasks\r
+ d['outputParameters'] = self._outputParameters\r
+\r
+ return d\r
+\r
+ def generateMetaData(self, flowList, stepObjArr):\r
+ flowParser = FlowParser(flowList, stepObjArr)\r
+ self._tasks, taskMetaList = flowParser.parseMainFlow()\r
+ normalTasks = flowParser.getNormalTaskList()\r
+ for normalTask in normalTasks:\r
+ taskName = normalTask['name']\r
+ referenceName = normalTask['taskReferenceName']\r
+ self._outputParameters["%s(%s)" % (taskName, referenceName)] = \\r
+ "${%s.output.response.body}" % referenceName\r
+ return self.getDict(), taskMetaList\r
+\r
+\r
+class FlowParser(object):\r
+ def __init__(self, flowList, stepObjArr):\r
+ self._mainFlow = {}\r
+ self._subFlowDict = {}\r
+ self._stepObjArr = stepObjArr\r
+ self._normalTasks = []\r
+ for flow in flowList:\r
+ if flow['name'] == "main":\r
+ self._mainFlow = flow\r
+ else:\r
+ self._subFlowDict[flow['name']] = flow\r
+\r
+ def parseMainFlow(self):\r
+ return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)\r
+\r
+ def parse(self, obj, stepObjArr):\r
+ if isinstance(obj, str):\r
+ return self.parseFlow(obj, stepObjArr)\r
+ else:\r
+ return self.parseOrderList(obj, stepObjArr)\r
+\r
+ def parseFlow(self, flowName, stepObjArr):\r
+ orderList = self._subFlowDict[flowName]['orders']\r
+ return self.parseOrderList(orderList, stepObjArr)\r
+\r
+ def parseOrderList(self, orderList, stepObjArr):\r
+ tasks = []\r
+ taskMetaAllList = []\r
+ for order in orderList:\r
+ if order['type'] == "normal":\r
+ genTask = NormalTask(order, stepObjArr, self)\r
+ self._normalTasks.append(genTask)\r
+ elif order['type'] == "switch":\r
+ genTask = SwitchTask(order, stepObjArr, self)\r
+ elif order['type'] == "parallel":\r
+ genTask = ParallelTask(order, stepObjArr, self)\r
+ tasks.append(genTask.getDict())\r
+\r
+ if order['type'] == "parallel":\r
+ joinTask = genTask.getJoinTask()\r
+ tasks.append(joinTask.getDict())\r
+\r
+ taskMetaList = genTask.getTaskMetaList()\r
+ if taskMetaList is not None:\r
+ taskMetaAllList.extend(taskMetaList)\r
+ return tasks, taskMetaAllList\r
+\r
+ def getNormalTaskList(self):\r
+ normalTasksDict = []\r
+ for normalTask in self._normalTasks:\r
+ normalTasksDict.append(normalTask.getDict())\r
+ return normalTasksDict\r
+\r
+ def getNormalTask(self, stepId):\r
+ for normalTask in self._normalTasks:\r
+ if normalTask.getStepId() == stepId:\r
+ return normalTask\r
+ return None\r
+\r
+\r
+class BaseWorkflowTask(object):\r
+ def __init__(self, name):\r
+ self._name = name\r
+ self._taskReferenceName = self._name + "_task_%s" % getRandString(10)\r
+ self._type = ''\r
+ self._args = {}\r
+\r
+ def __str__(self):\r
+ dictObj = self.getDict()\r
+ return str(dictObj)\r
+\r
+ def getDict(self):\r
+ d1 = {\r
+ "name": self._name,\r
+ "taskReferenceName": self._taskReferenceName,\r
+ "type": self._type\r
+ }\r
+ return dict(d1, **self._args)\r
+\r
+ def getName(self):\r
+ return self._name\r
+\r
+ def getReferenceName(self):\r
+ return self._taskReferenceName\r
+\r
+ def getTaskMetaList(self):\r
+ taskFile = TaskFile()\r
+ return [taskFile.generateFromStep(self)]\r
+\r
+\r
+class NormalTask(BaseWorkflowTask):\r
+ def __init__(self, order, stepObjArr, flowParser):\r
+ relatedStepObj = stepObjArr[order['step'] - 1]\r
+ super(NormalTask, self).__init__(relatedStepObj.getName())\r
+ self._taskReferenceName = "task_%s" % getRandString(10)\r
+ self._stepId = relatedStepObj.getId()\r
+ self._type = "HTTP"\r
+ self._args['inputParameters'] = relatedStepObj.getArgs()\r
+ self._paramTransform(self._args['inputParameters'], flowParser)\r
+ print "NormalTask:----------------------\n", relatedStepObj.getArgs()\r
+\r
+ def _paramTransform(self, argsDict, flowParser):\r
+ for (k, v) in argsDict.items():\r
+ if isinstance(v, str):\r
+ if re.match("^\(\(\d+\..*\)\)", v):\r
+ v = v[2:-2]\r
+ stepId, outputParam = v.split(".")\r
+ stepId = int(stepId)\r
+ normalTask = flowParser.getNormalTask(stepId)\r
+ if normalTask is None:\r
+ continue\r
+ argsDict[k] = "${%s.output.response.body.%s}" % \\r
+ (normalTask.getReferenceName(), outputParam)\r
+ elif isinstance(v, dict):\r
+ self._paramTransform(v, flowParser)\r
+\r
+ def getStepId(self):\r
+ return self._stepId\r
+\r
+\r
+class SwitchTask(BaseWorkflowTask):\r
+ seqNumber = 0\r
+\r
+ def __init__(self, order, stepObjArr, flowParser):\r
+ super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))\r
+ SwitchTask.seqNumber = SwitchTask.seqNumber + 1\r
+ if 'name' in order:\r
+ self._name = order['name']\r
+ self._type = "DECISION"\r
+ caseValueParam = 'value'\r
+ order['value'] = order['value'][2:-2]\r
+ stepId, outputParam = order['value'].split(".")\r
+ stepId = int(stepId)\r
+ normalTask = flowParser.getNormalTask(stepId)\r
+ caseValue = "${%s.output.response.body.%s}" % \\r
+ (normalTask.getReferenceName(), outputParam)\r
+ self._args['inputParameters'] = {caseValueParam: caseValue}\r
+ self._args['caseValueParam'] = caseValueParam\r
+ self._args['decisionCases'] = {}\r
+ self._childTaskMetaList = []\r
+ for case, caseOrders in order['cases'].items():\r
+ self._args['decisionCases'][case], taskMetaList = \\r
+ flowParser.parse(caseOrders, stepObjArr)\r
+ if taskMetaList is not None:\r
+ self._childTaskMetaList.extend(taskMetaList)\r
+\r
+ def getTaskMetaList(self):\r
+ selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()\r
+ selfTaskMetaList.extend(self._childTaskMetaList)\r
+ return selfTaskMetaList\r
+\r
+\r
+class ParallelTask(BaseWorkflowTask):\r
+ seqNumber = 0\r
+\r
+ def __init__(self, order, stepObjArr, flowParser):\r
+ InstSeqNumber = ParallelTask.seqNumber\r
+ super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))\r
+ ParallelTask.seqNumber = ParallelTask.seqNumber + 1\r
+ if 'name' in order:\r
+ self._name = order['name']\r
+ self._type = "FORK_JOIN"\r
+ self._args['forkTasks'] = []\r
+ self._childTaskMetaList = []\r
+ lastTasksNameList = []\r
+ parallelList = order['parallel'].items()\r
+ parallelList.sort()\r
+ for key, orderList in parallelList:\r
+ print orderList\r
+ taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)\r
+ self._args['forkTasks'].append(taskList)\r
+ lastTasksNameList.append(taskList[-1]['taskReferenceName'])\r
+ if taskMetaList is not None:\r
+ self._childTaskMetaList.extend(taskMetaList)\r
+ self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)\r
+\r
+ def getTaskMetaList(self):\r
+ selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()\r
+ selfTaskMetaList.extend(self._childTaskMetaList)\r
+ selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())\r
+ return selfTaskMetaList\r
+\r
+ def getJoinTask(self):\r
+ return self._joinTaskObj\r
+\r
+\r
+class ParallelJoinTask(BaseWorkflowTask):\r
+ def __init__(self, seqNumber, joinOnList):\r
+ super(ParallelJoinTask, self).__init__(\r
+ "paralleljoin_" + str(seqNumber))\r
+ self._type = "JOIN"\r
+ self._args['joinOn'] = joinOnList\r
+\r
+\r
+def getRandString(length):\r
+ return "".join(random.choice(str("0123456789")) for i in range(length))\r