Merge "Add test-scheduler dir to verity"
[bottlenecks.git] / testing-scheduler / server / conductorclient / run_new_workflow.py
1 ##############################################################################\r
2 # Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others\r
3 #\r
4 # All rights reserved. This program and the accompanying materials\r
5 # are made available under the terms of the Apache License, Version 2.0\r
6 # which accompanies this distribution, and is available at\r
7 # http://www.apache.org/licenses/LICENSE-2.0\r
8 ##############################################################################\r
9 \r
10 from conductor import conductor\r
11 import json\r
12 \r
13 \r
14 class WorkflowMgr(object):\r
15     def __init__(self, serverAddr):\r
16         self._serverAddr = serverAddr + '/api'\r
17         self._metaDataClient = conductor.MetadataClient(self._serverAddr)\r
18         self._workflowClient = conductor.WorkflowClient(self._serverAddr)\r
19         self._tasksDefined = False\r
20         self._workflowDefined = False\r
21         self._workflowName = ""\r
22 \r
23     def setTaskDef(self, taskJson):\r
24         jsonObj = json.loads(taskJson)\r
25         print "define tasks:\n", taskJson\r
26         for (k, v) in jsonObj.items():\r
27             self._metaDataClient.registerTaskDefs(v)\r
28         self._tasksDefined = True\r
29 \r
30     def setWorkflowDef(self, workflowJson):\r
31         jsonObj = json.loads(workflowJson)\r
32         print "define workflow:\n", workflowJson\r
33         try:\r
34             self._metaDataClient.createWorkflowDef(jsonObj)\r
35         except Exception as e:\r
36             print e\r
37         self._workflowName = jsonObj['name']\r
38         self._workflowDefined = True\r
39 \r
40     def startWorkflow(self, param={}):\r
41         workflowId = ''\r
42         if not self._tasksDefined:\r
43             print "error: please define the task at first\n"\r
44         elif not self._workflowDefined:\r
45             print "error: please define the workflow at first\n"\r
46         else:\r
47             workflowId = self._workflowClient.startWorkflow(\r
48                 self._workflowName, param)\r
49         return workflowId\r
50 \r
51     def setTaskDefFromFile(self, taskFilePath):\r
52         with open(taskFilePath, 'r') as f:\r
53             self.setTaskDef(f.read())\r
54 \r
55     def setWorkflowFromFile(self, workflowFilePath):\r
56         with open(workflowFilePath, 'r') as f:\r
57             self.setWorkflowDef(f.read())\r
58 \r
59 \r
60 # test demo\r
61 def main():\r
62     serverAddr = "http://192.168.199.131:8080"\r
63     wfMgr = WorkflowMgr(serverAddr)\r
64     wfMgr.setTaskDefFromFile('mock_tasks.json')\r
65     wfMgr.setWorkflowFromFile('mock_workflow.json')\r
66     inputParam = {'input': 'fake'}\r
67     wfMgr.startWorkflow(inputParam)\r
68 \r
69 \r
70 if __name__ == "__main__":\r
71     main()\r