Backport of Congress parallel execution 43/36643/6
authorCarlos Goncalves <carlos.goncalves@neclab.eu>
Wed, 28 Jun 2017 10:02:51 +0000 (12:02 +0200)
committerCarlos Goncalves <carlos.goncalves@neclab.eu>
Sat, 12 Aug 2017 08:36:51 +0000 (10:36 +0200)
JIRA: APEX-480
JIRA: DOCTOR-78

Change-Id: I6b5b3e7f2daaec7e2ead76d74f8d3713378a5200
Signed-off-by: Carlos Goncalves <carlos.goncalves@neclab.eu>
build/overcloud-full.sh
build/patches/congress-parallel-execution.patch [new file with mode: 0644]

index 403ab8e..0d12dbe 100755 (executable)
@@ -68,11 +68,13 @@ LIBGUESTFS_BACKEND=direct virt-customize \
     --upload ${BUILD_ROOT}/patches/neutron_openstacksdk_dps.patch:/usr/lib/python2.7/site-packages/ \
     --upload ${BUILD_ROOT}/patches/neutron_openstackclient_dps.patch:/usr/lib/python2.7/site-packages/ \
     --upload ${BUILD_ROOT}/patches/puppet-neutron-add-sfc.patch:/usr/share/openstack-puppet/modules/neutron/ \
+    --upload ${BUILD_ROOT}/patches/congress-parallel-execution.patch:/usr/lib/python2.7/site-packages/ \
     -a overcloud-full_build.qcow2
 
 # apply neutron port data plane status patches
 # https://specs.openstack.org/openstack/neutron-specs/specs/backlog/ocata/port-data-plane-status.html
-# Requirement from Doctor project
+# apply congress parallel execution patch
+# Requirements from Doctor project
 # TODO(cgoncalves): code merged in Pike dev cycle. drop from >= OpenStack Pike / > OPNFV Euphrates
 LIBGUESTFS_BACKEND=direct virt-customize \
     --run-command "cd /usr/lib/python2.7/site-packages/ && patch -p1 < neutron_lib_dps.patch " \
@@ -80,6 +82,7 @@ LIBGUESTFS_BACKEND=direct virt-customize \
     --run-command "cd /usr/lib/python2.7/site-packages/ && patch -p1 < neutron_openstacksdk_dps.patch" \
     --run-command "cd /usr/lib/python2.7/site-packages/ && patch -p1 < neutron_openstackclient_dps.patch" \
     --run-command "cd /usr/share/openstack-puppet/modules/neutron && patch -p1 < puppet-neutron-add-sfc.patch" \
+    --run-command "cd /usr/lib/python2.7/site-packages/ && patch -p1 < congress-parallel-execution.patch" \
     -a overcloud-full_build.qcow2
 
 # Arch dependent on x86
diff --git a/build/patches/congress-parallel-execution.patch b/build/patches/congress-parallel-execution.patch
new file mode 100644 (file)
index 0000000..ca48c6f
--- /dev/null
@@ -0,0 +1,86 @@
+From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
+From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
+Date: Tue, 18 Apr 2017 04:22:24 +0900
+Subject: [PATCH] Parallel execution in DataSource Driver
+
+Datasource driver calls datasource's API serially when Policy Engine sends
+execution requests.  It could take long time number of execution targets is
+a lots.
+
+This patch changes datasource driver calls datasource's API in parallel.
+
+Closes-Bug: #1670529
+Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
+---
+ congress/datasources/datasource_driver.py | 15 +++++++++------
+ congress/policy_engines/agnostic.py       |  6 ++++--
+ 2 files changed, 13 insertions(+), 8 deletions(-)
+
+diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
+index eec83017..8eeb62d7 100644
+--- a/congress/datasources/datasource_driver.py
++++ b/congress/datasources/datasource_driver.py
+@@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
+     def request_refresh(self, context, source_id):
+         return self.service.request_refresh()
+-    def request_execute(self, context, action, action_args):
+-        return self.service.request_execute(context, action, action_args)
++    def request_execute(self, context, action, action_args, wait):
++        return self.service.request_execute(context, action, action_args, wait)
+ class PushedDataSourceDriver(DataSourceDriver):
+@@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
+         return {'results': actions}
+     # Note(thread-safety): blocking function
+-    def request_execute(self, context, action, action_args):
++    def request_execute(self, context, action, action_args, wait):
+         """Accept execution requests and execute requests from leader"""
+         node_id = context.get('node_id', None)
++        th = None
+         if self._leader_node_id == node_id:
+-                # Note(thread-safety): blocking call
+-                self.execute(action, action_args)
++            # Note(thread-safety): blocking call
++            th = eventlet.spawn(self.execute, action, action_args)
+         elif node_id is not None:
+             if self._leader_node_id is None:
+                 self._leader_node_id = node_id
+                 LOG.debug('New local leader %s selected', self._leader_node_id)
+                 # Note(thread-safety): blocking call
+-                self.execute(action, action_args)
++                th = eventlet.spawn(self.execute, action, action_args)
++        if wait and th:
++            th.wait()
+     # Note(thread-safety): blocking function (in some subclasses)
+     def execute(self, action, action_args):
+diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
+index d1d67bdc..df09ed96 100644
+--- a/congress/policy_engines/agnostic.py
++++ b/congress/policy_engines/agnostic.py
+@@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
+         """Overloading the DseRuntime version of _rpc so it uses dse2."""
+         # TODO(ramineni): This is called only during execute_action, added
+         # the same function name for compatibility with old arch
+-        args = {'action': action, 'action_args': args}
++
++        retry_rpc = cfg.CONF.dse.execute_action_retry
++        args = {'action': action, 'action_args': args, 'wait': retry_rpc}
+         def execute_once():
+             return self.rpc(service_name, 'request_execute', args,
+@@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
+                       action, args['action_args'])
+         # long timeout for action execution because actions can take a while
+-        if not cfg.CONF.dse.execute_action_retry:
++        if not retry_rpc:
+             # Note(thread-safety): blocking call
+             #   Only when thread pool at capacity
+             eventlet.spawn_n(execute_once)
+-- 
+2.12.3
+