1 ##############################################################################
\r
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
\r
4 # All rights reserved. This program and the accompanying materials
\r
5 # are made available under the terms of the Apache License, Version 2.0
\r
6 # which accompanies this distribution, and is available at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
8 ##############################################################################
\r
10 from conductor import conductor
\r
14 class WorkflowMgr(object):
\r
15 def __init__(self, serverAddr):
\r
16 self._serverAddr = serverAddr + '/api'
\r
17 self._metaDataClient = conductor.MetadataClient(self._serverAddr)
\r
18 self._workflowClient = conductor.WorkflowClient(self._serverAddr)
\r
19 self._tasksDefined = False
\r
20 self._workflowDefined = False
\r
21 self._workflowName = ""
\r
23 def setTaskDef(self, taskJson):
\r
24 jsonObj = json.loads(taskJson)
\r
25 print "define tasks:\n", taskJson
\r
26 for (k, v) in jsonObj.items():
\r
27 self._metaDataClient.registerTaskDefs(v)
\r
28 self._tasksDefined = True
\r
30 def setWorkflowDef(self, workflowJson):
\r
31 jsonObj = json.loads(workflowJson)
\r
32 print "define workflow:\n", workflowJson
\r
34 self._metaDataClient.createWorkflowDef(jsonObj)
\r
35 except Exception as e:
\r
37 self._workflowName = jsonObj['name']
\r
38 self._workflowDefined = True
\r
40 def startWorkflow(self, param={}):
\r
42 if not self._tasksDefined:
\r
43 print "error: please define the task at first\n"
\r
44 elif not self._workflowDefined:
\r
45 print "error: please define the workflow at first\n"
\r
47 workflowId = self._workflowClient.startWorkflow(
\r
48 self._workflowName, param)
\r
51 def setTaskDefFromFile(self, taskFilePath):
\r
52 with open(taskFilePath, 'r') as f:
\r
53 self.setTaskDef(f.read())
\r
55 def setWorkflowFromFile(self, workflowFilePath):
\r
56 with open(workflowFilePath, 'r') as f:
\r
57 self.setWorkflowDef(f.read())
\r
62 serverAddr = "http://192.168.199.131:8080"
\r
63 wfMgr = WorkflowMgr(serverAddr)
\r
64 wfMgr.setTaskDefFromFile('mock_tasks.json')
\r
65 wfMgr.setWorkflowFromFile('mock_workflow.json')
\r
66 inputParam = {'input': 'fake'}
\r
67 wfMgr.startWorkflow(inputParam)
\r
70 if __name__ == "__main__":
\r