1 ##############################################################################
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
10 from conductor import conductor
14 class WorkflowMgr(object):
15 def __init__(self, serverAddr):
16 self._serverAddr = serverAddr + '/api'
17 self._metaDataClient = conductor.MetadataClient(self._serverAddr)
18 self._workflowClient = conductor.WorkflowClient(self._serverAddr)
19 self._tasksDefined = False
20 self._workflowDefined = False
21 self._workflowName = ""
23 def setTaskDef(self, taskJson):
24 jsonObj = json.loads(taskJson)
25 print "define tasks:\n", taskJson
26 for (k, v) in jsonObj.items():
27 self._metaDataClient.registerTaskDefs(v)
28 self._tasksDefined = True
30 def setWorkflowDef(self, workflowJson):
31 jsonObj = json.loads(workflowJson)
32 print "define workflow:\n", workflowJson
34 self._metaDataClient.createWorkflowDef(jsonObj)
35 except Exception as e:
37 self._workflowName = jsonObj['name']
38 self._workflowDefined = True
40 def startWorkflow(self, param={}):
42 if not self._tasksDefined:
43 print "error: please define the task at first\n"
44 elif not self._workflowDefined:
45 print "error: please define the workflow at first\n"
47 workflowId = self._workflowClient.startWorkflow(
48 self._workflowName, param)
51 def setTaskDefFromFile(self, taskFilePath):
52 with open(taskFilePath, 'r') as f:
53 self.setTaskDef(f.read())
55 def setWorkflowFromFile(self, workflowFilePath):
56 with open(workflowFilePath, 'r') as f:
57 self.setWorkflowDef(f.read())
62 serverAddr = "http://192.168.199.131:8080"
63 wfMgr = WorkflowMgr(serverAddr)
64 wfMgr.setTaskDefFromFile('mock_tasks.json')
65 wfMgr.setWorkflowFromFile('mock_workflow.json')
66 inputParam = {'input': 'fake'}
67 wfMgr.startWorkflow(inputParam)
70 if __name__ == "__main__":