1 From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
2 From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
3 Date: Tue, 18 Apr 2017 04:22:24 +0900
4 Subject: [PATCH] Parallel execution in DataSource Driver
6 Datasource driver calls datasource's API serially when Policy Engine sends
7 execution requests. It could take long time number of execution targets is
10 This patch changes datasource driver calls datasource's API in parallel.
13 Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
15 congress/datasources/datasource_driver.py | 15 +++++++++------
16 congress/policy_engines/agnostic.py | 6 ++++--
17 2 files changed, 13 insertions(+), 8 deletions(-)
19 diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
20 index eec83017..8eeb62d7 100644
21 --- a/congress/datasources/datasource_driver.py
22 +++ b/congress/datasources/datasource_driver.py
23 @@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
24 def request_refresh(self, context, source_id):
25 return self.service.request_refresh()
27 - def request_execute(self, context, action, action_args):
28 - return self.service.request_execute(context, action, action_args)
29 + def request_execute(self, context, action, action_args, wait):
30 + return self.service.request_execute(context, action, action_args, wait)
33 class PushedDataSourceDriver(DataSourceDriver):
34 @@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
35 return {'results': actions}
37 # Note(thread-safety): blocking function
38 - def request_execute(self, context, action, action_args):
39 + def request_execute(self, context, action, action_args, wait):
40 """Accept execution requests and execute requests from leader"""
41 node_id = context.get('node_id', None)
43 if self._leader_node_id == node_id:
44 - # Note(thread-safety): blocking call
45 - self.execute(action, action_args)
46 + # Note(thread-safety): blocking call
47 + th = eventlet.spawn(self.execute, action, action_args)
48 elif node_id is not None:
49 if self._leader_node_id is None:
50 self._leader_node_id = node_id
51 LOG.debug('New local leader %s selected', self._leader_node_id)
52 # Note(thread-safety): blocking call
53 - self.execute(action, action_args)
54 + th = eventlet.spawn(self.execute, action, action_args)
58 # Note(thread-safety): blocking function (in some subclasses)
59 def execute(self, action, action_args):
60 diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
61 index d1d67bdc..df09ed96 100644
62 --- a/congress/policy_engines/agnostic.py
63 +++ b/congress/policy_engines/agnostic.py
64 @@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
65 """Overloading the DseRuntime version of _rpc so it uses dse2."""
66 # TODO(ramineni): This is called only during execute_action, added
67 # the same function name for compatibility with old arch
68 - args = {'action': action, 'action_args': args}
70 + retry_rpc = cfg.CONF.dse.execute_action_retry
71 + args = {'action': action, 'action_args': args, 'wait': retry_rpc}
74 return self.rpc(service_name, 'request_execute', args,
75 @@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
76 action, args['action_args'])
78 # long timeout for action execution because actions can take a while
79 - if not cfg.CONF.dse.execute_action_retry:
81 # Note(thread-safety): blocking call
82 # Only when thread pool at capacity
83 eventlet.spawn_n(execute_once)