Update openstack refs to opendev
[apex.git] / build / patches / congress-parallel-execution.patch
1 From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
2 From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
3 Date: Tue, 18 Apr 2017 04:22:24 +0900
4 Subject: [PATCH] Parallel execution in DataSource Driver
5
6 Datasource driver calls datasource's API serially when Policy Engine sends
7 execution requests.  It could take long time number of execution targets is
8 a lots.
9
10 This patch changes datasource driver calls datasource's API in parallel.
11
12 Closes-Bug: #1670529
13 Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
14 ---
15  congress/datasources/datasource_driver.py | 15 +++++++++------
16  congress/policy_engines/agnostic.py       |  6 ++++--
17  2 files changed, 13 insertions(+), 8 deletions(-)
18
19 diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
20 index eec83017..8eeb62d7 100644
21 --- a/congress/datasources/datasource_driver.py
22 +++ b/congress/datasources/datasource_driver.py
23 @@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
24      def request_refresh(self, context, source_id):
25          return self.service.request_refresh()
26  
27 -    def request_execute(self, context, action, action_args):
28 -        return self.service.request_execute(context, action, action_args)
29 +    def request_execute(self, context, action, action_args, wait):
30 +        return self.service.request_execute(context, action, action_args, wait)
31  
32  
33  class PushedDataSourceDriver(DataSourceDriver):
34 @@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
35          return {'results': actions}
36  
37      # Note(thread-safety): blocking function
38 -    def request_execute(self, context, action, action_args):
39 +    def request_execute(self, context, action, action_args, wait):
40          """Accept execution requests and execute requests from leader"""
41          node_id = context.get('node_id', None)
42 +        th = None
43          if self._leader_node_id == node_id:
44 -                # Note(thread-safety): blocking call
45 -                self.execute(action, action_args)
46 +            # Note(thread-safety): blocking call
47 +            th = eventlet.spawn(self.execute, action, action_args)
48          elif node_id is not None:
49              if self._leader_node_id is None:
50                  self._leader_node_id = node_id
51                  LOG.debug('New local leader %s selected', self._leader_node_id)
52                  # Note(thread-safety): blocking call
53 -                self.execute(action, action_args)
54 +                th = eventlet.spawn(self.execute, action, action_args)
55 +        if wait and th:
56 +            th.wait()
57  
58      # Note(thread-safety): blocking function (in some subclasses)
59      def execute(self, action, action_args):
60 diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
61 index d1d67bdc..df09ed96 100644
62 --- a/congress/policy_engines/agnostic.py
63 +++ b/congress/policy_engines/agnostic.py
64 @@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
65          """Overloading the DseRuntime version of _rpc so it uses dse2."""
66          # TODO(ramineni): This is called only during execute_action, added
67          # the same function name for compatibility with old arch
68 -        args = {'action': action, 'action_args': args}
69 +
70 +        retry_rpc = cfg.CONF.dse.execute_action_retry
71 +        args = {'action': action, 'action_args': args, 'wait': retry_rpc}
72  
73          def execute_once():
74              return self.rpc(service_name, 'request_execute', args,
75 @@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
76                        action, args['action_args'])
77  
78          # long timeout for action execution because actions can take a while
79 -        if not cfg.CONF.dse.execute_action_retry:
80 +        if not retry_rpc:
81              # Note(thread-safety): blocking call
82              #   Only when thread pool at capacity
83              eventlet.spawn_n(execute_once)
84 -- 
85 2.12.3
86