Switching to Threading instead of multiprocessing 64/70264/1
authorCédric Ollivier <cedric.ollivier@orange.com>
Wed, 3 Jun 2020 13:25:58 +0000 (15:25 +0200)
committerCédric Ollivier <cedric.ollivier@orange.com>
Wed, 3 Jun 2020 15:34:46 +0000 (17:34 +0200)
Change-Id: Id5059a06447357f4c9b058bad374ed6cbe4d742c
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
(cherry picked from commit 37b7e536d679303ad6ddfade450201efd90a983a)

docker/core/Try-to-detect-the-race-conditions.patch

index 82f60e2..9413d76 100644 (file)
-From 41ce1778631894401573161e627a78c2b44182a1 Mon Sep 17 00:00:00 2001
+From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001
 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
-Date: Thu, 30 Apr 2020 13:59:24 +0200
-Subject: [PATCH] Try to detect the race conditions
+Date: Wed, 3 Jun 2020 15:23:59 +0200
+Subject: [PATCH 11/11] Try to detect the race conditions
 MIME-Version: 1.0
 Content-Type: text/plain; charset=UTF-8
 Content-Transfer-Encoding: 8bit
 
-Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198
+Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da
 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
 ---
- rally/cli/main.py                             |  6 +++
- rally/plugins/task/runners/constant.py        | 41 +++++++++++++++----
- rally/plugins/task/runners/rps.py             |  8 ++--
- .../task/scenarios/requests/http_requests.py  |  9 ++++
- .../plugins/task/scenarios/requests/utils.py  |  9 ++++
- rally/task/runner.py                          | 29 +++++++++++--
- rally/task/utils.py                           | 15 +++++++
- 7 files changed, 102 insertions(+), 15 deletions(-)
+ rally/cli/main.py    |  5 ++++-
+ rally/task/runner.py | 23 ++++++++++++++++++-----
+ 2 files changed, 22 insertions(+), 6 deletions(-)
 
 diff --git a/rally/cli/main.py b/rally/cli/main.py
-index 235a57113..d931924d8 100644
+index 235a57113..14c057c0e 100644
 --- a/rally/cli/main.py
 +++ b/rally/cli/main.py
-@@ -15,6 +15,12 @@
+@@ -15,6 +15,10 @@
  
  """CLI interface for Rally."""
  
++STACK_SIZE = 1024 * 1024
 +import threading
-+threading.stack_size(1024 * 1024)
-+
-+import multiprocessing as mp
-+mp.set_start_method('fork')
++threading.stack_size(STACK_SIZE)
 +
  import sys
  
  from rally.cli import cliutils
-diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py
-index 5feb1fee1..2f6142cfa 100644
---- a/rally/plugins/task/runners/constant.py
-+++ b/rally/plugins/task/runners/constant.py
-@@ -15,7 +15,7 @@
- import collections
- import multiprocessing
--import queue as Queue
-+import queue
- import threading
- import time
-@@ -24,6 +24,10 @@ from rally.common import validation
- from rally import consts
- from rally.task import runner
-+from rally.common import cfg
-+from rally.common import logging
-+CONF = cfg.CONF
-+LOG = logging.getLogger(__file__)
- def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-                     duration, context, cls, method_name, args, event_queue,
-@@ -55,6 +59,9 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-     """
-     def _to_be_continued(iteration, current_duration, aborted, times=None,
-                          duration=None):
-+        LOG.warning(
-+            "!! _to_be_continued  %(iteration)s %(current_duration)s %(aborted)s %(times)s %(duration)s !! " %
-+            {"iteration": iteration, "current_duration": current_duration, "aborted": aborted, "times": times, "duration": duration})
-         if times is not None:
-             return iteration < times and not aborted.is_set()
-         elif duration is not None:
-@@ -74,7 +81,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-                             method_name=method_name, args=args)
-     if timeout:
--        timeout_queue = Queue.Queue()
-+        timeout_queue = queue.Queue()
-         collector_thr_by_timeout = threading.Thread(
-             target=utils.timeout_thread,
-             args=(timeout_queue, )
-@@ -82,6 +89,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-         collector_thr_by_timeout.start()
-     iteration = next(iteration_gen)
-+    LOG.warning("!! iteration  %(iteration)s !! " % {"iteration": iteration})
-     start_time = time.time()
-     # NOTE(msimonin): keep the previous behaviour
-     # > when duration is 0, scenario executes exactly 1 time
-@@ -93,13 +101,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-         worker_args = (
-             queue, cls, method_name, scenario_context, args, event_queue)
-+        LOG.warn(
-+            "Cedric _to_be_continued threading.Thread {} {}".format(
-+                runner._worker_thread, worker_args))
-         thread = threading.Thread(target=runner._worker_thread,
-                                   args=worker_args)
-+        LOG.warn(
-+            "Cedric _to_be_continued thread.start() {} {}".format(
-+                runner._worker_thread, worker_args))
-         thread.start()
-+        LOG.warn(
-+            "Cedric _to_be_continued thread.start() {} {}".format(
-+                runner._worker_thread, worker_args))
-         if timeout:
-             timeout_queue.put((thread, time.time() + timeout))
-         pool.append(thread)
-+        LOG.warn(
-+            "Cedric _to_be_continued pool.append {} {}".format(
-+                pool, thread))
-         alive_threads_in_pool += 1
-         while alive_threads_in_pool == concurrency:
-@@ -128,7 +148,14 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
-     # Wait until all threads are done
-     while pool:
--        pool.popleft().join()
-+        thr = pool.popleft()
-+        LOG.warn(
-+            "Cedric _worker_process wait_all_threads {} {} BEFORE JOIN".format(
-+                pool, thr))
-+        thr.join()
-+        LOG.warn(
-+            "Cedric _worker_process wait_all_threads {} {} AFTER JOIN".format(
-+                pool, thr))
-     if timeout:
-         timeout_queue.put((None, None,))
-@@ -229,8 +256,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
-                              concurrency_per_worker=concurrency_per_worker,
-                              concurrency_overhead=concurrency_overhead)
--        result_queue = multiprocessing.Queue()
--        event_queue = multiprocessing.Queue()
-+        result_queue = queue.Queue()
-+        event_queue = queue.Queue()
-         def worker_args_gen(concurrency_overhead):
-             while True:
-@@ -324,8 +351,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
-                              concurrency_per_worker=concurrency_per_worker,
-                              concurrency_overhead=concurrency_overhead)
--        result_queue = multiprocessing.Queue()
--        event_queue = multiprocessing.Queue()
-+        result_queue = queue.Queue()
-+        event_queue = queue.Queue()
-         def worker_args_gen(concurrency_overhead):
-             while True:
-diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py
-index 98a706d11..26b55feab 100644
---- a/rally/plugins/task/runners/rps.py
-+++ b/rally/plugins/task/runners/rps.py
-@@ -15,7 +15,7 @@
+@@ -25,7 +29,6 @@ from rally.cli.commands import plugin
+ from rally.cli.commands import task
+ from rally.cli.commands import verify
+-
+ categories = {
+     "db": db.DBCommands,
+     "env": env.EnvCommands,
+diff --git a/rally/task/runner.py b/rally/task/runner.py
+index 3397e1193..b2fde8550 100644
+--- a/rally/task/runner.py
++++ b/rally/task/runner.py
+@@ -17,6 +17,7 @@ import abc
  import collections
+ import copy
  import multiprocessing
--import queue as Queue
-+import queue
- import threading
++import threading
  import time
  
-@@ -69,7 +69,7 @@ def _worker_process(queue, iteration_gen, timeout, times, max_concurrent,
-         (sleep * info["processes_counter"]) / info["processes_to_start"])
-     start = time.time()
--    timeout_queue = Queue.Queue()
-+    timeout_queue = queue.Queue()
-     if timeout:
-         collector_thr_by_timeout = threading.Thread(
-@@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner):
-                              concurrency_per_worker=concurrency_per_worker,
-                              concurrency_overhead=concurrency_overhead)
--        result_queue = multiprocessing.Queue()
--        event_queue = multiprocessing.Queue()
-+        result_queue = queue.Queue()
-+        event_queue = queue.Queue()
-         def worker_args_gen(times_overhead, concurrency_overhead):
-             """Generate arguments for process worker.
-diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py
-index e85ee5af2..4afdf29f1 100644
---- a/rally/plugins/task/scenarios/requests/http_requests.py
-+++ b/rally/plugins/task/scenarios/requests/http_requests.py
-@@ -15,6 +15,11 @@ import random
- from rally.plugins.task.scenarios.requests import utils
- from rally.task import scenario
-+from rally.common import cfg
-+from rally.common import logging
-+CONF = cfg.CONF
-+LOG = logging.getLogger(__file__)
-+
- """Scenarios for HTTP requests."""
-@@ -34,6 +39,10 @@ class HttpRequestsCheckRequest(utils.RequestScenario):
-         :param kwargs: optional additional request parameters
-         """
-+        LOG.warn("Cedric run url {}".format(url))
-+        LOG.warn("Cedric run method {}".format(method))
-+        LOG.warn("Cedric run status_code {}".format(status_code))
-+        LOG.warn("Cedric run kwargs {}".format(kwargs))
-         self._check_request(url, method, status_code, **kwargs)
-diff --git a/rally/plugins/task/scenarios/requests/utils.py b/rally/plugins/task/scenarios/requests/utils.py
-index 8fd35347a..cd69900a5 100644
---- a/rally/plugins/task/scenarios/requests/utils.py
-+++ b/rally/plugins/task/scenarios/requests/utils.py
-@@ -15,6 +15,11 @@ import requests
- from rally.task import atomic
- from rally.task import scenario
-+from rally.common import cfg
-+from rally.common import logging
-+CONF = cfg.CONF
-+LOG = logging.getLogger(__file__)
-+
- class RequestScenario(scenario.Scenario):
-     """Base class for Request scenarios with basic atomic actions."""
-@@ -31,6 +36,10 @@ class RequestScenario(scenario.Scenario):
-                             not equal to expected status code
-         """
-+        LOG.warn("Cedric _check_request url {}".format(url))
-+        LOG.warn("Cedric _check_request method {}".format(method))
-+        LOG.warn("Cedric _check_request status_code {}".format(status_code))
-+        LOG.warn("Cedric _check_request kwargs {}".format(kwargs))
-         resp = requests.request(method, url, **kwargs)
-         if status_code != resp.status_code:
-             error_msg = "Expected HTTP request code is `%s` actual `%s`"
-diff --git a/rally/task/runner.py b/rally/task/runner.py
-index 3397e1193..295558f44 100644
---- a/rally/task/runner.py
-+++ b/rally/task/runner.py
-@@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
+ from rally.common import logging
+@@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj):
+ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
+                        event_queue):
+     iteration = context_obj["iteration"]
++    LOG.info("DEBUGRACE %s putting in event_queue iteration: %s",
++             threading.get_native_id(), iteration)
+     event_queue.put({
+         "type": "iteration",
+         "value": iteration,
+     })
++    LOG.info("DEBUGRACE %s put in event_queue iteration: %s",
++             threading.get_native_id(), iteration)
+     # provide arguments isolation between iterations
+     scenario_kwargs = copy.deepcopy(scenario_kwargs)
+@@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
+     scenario_inst = cls(context_obj)
+     error = []
+     try:
++        LOG.info("DEBUGRACE %s running %s %s",
++                 threading.get_native_id(), scenario_inst, scenario_inst)
+         with rutils.Timer() as timer:
+             getattr(scenario_inst, method_name)(**scenario_kwargs)
+     except Exception as e:
+@@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
  
  def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
                     event_queue):
-+    LOG.debug(
-+        "queue.put _run_scenario_once:\n\t%(cls)s\n\t%(method_name)s\n\t"
-+        "%(context_obj)s\n\t%(scenario_kwargs)s\n\t%(event_queue)s" %
-+        {"cls": cls, "method_name": method_name, "context_obj": context_obj,
-+         "scenario_kwargs": scenario_kwargs, "event_queue": event_queue})
-     queue.put(_run_scenario_once(cls, method_name, context_obj,
-                                  scenario_kwargs, event_queue))
-@@ -186,6 +191,8 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
+-    queue.put(_run_scenario_once(cls, method_name, context_obj,
+-                                 scenario_kwargs, event_queue))
++    result = _run_scenario_once(cls, method_name, context_obj,
++                                scenario_kwargs, event_queue)
++    LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s",
++             threading.get_native_id(),
++             context_obj)
++    queue.put(result)
++    LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s",
++             threading.get_native_id(), context_obj, result)
+ def _log_worker_info(**info):
+@@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
          for i in range(processes_to_start):
              kwrgs = {"processes_to_start": processes_to_start,
                       "processes_counter": i}
-+            LOG.warning(
-+                "!! _create_process_pool  %(kwrgs)s !! " % {"kwrgs": kwrgs})
-             process = multiprocessing.Process(target=worker_process,
-                                               args=next(worker_args_gen),
-                                               kwargs={"info": kwrgs})
-@@ -202,22 +209,28 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
-         :param event_queue: multiprocessing.Queue that receives the events
-         """
-         while process_pool:
-+            LOG.warn("Cedric _join_processes process_pool {}".format(process_pool))
-             while process_pool and not process_pool[0].is_alive():
-+                LOG.warn("Cedric _join_processes process_pool {}".format(process_pool))
-                 process_pool.popleft().join()
-             if result_queue.empty() and event_queue.empty():
-                 # sleep a bit to avoid 100% usage of CPU by this method
-+                LOG.warn("Cedric _join_processes result_queue is empty {}".format(result_queue))
-+                LOG.warn("Cedric _join_processes event_queue is empty {}".format(event_queue))
-                 time.sleep(0.01)
-             while not event_queue.empty():
--                self.send_event(**event_queue.get())
-+                col_event_queue = event_queue.get()
-+                LOG.warn("Cedric _join_processes event_queue is not empty {}".format(col_event_queue))
-+                self.send_event(**col_event_queue)
-             while not result_queue.empty():
--                self._send_result(result_queue.get())
-+                col_result_queue = result_queue.get()
-+                LOG.warn("Cedric _join_processes result_queue is not empty {}".format(col_result_queue))
-+                self._send_result(col_result_queue)
-         self._flush_results()
--        result_queue.close()
--        event_queue.close()
-     def _flush_results(self):
-         if self.result_batch:
-@@ -245,8 +258,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
-         if len(self.result_batch) >= self.batch_size:
-             sorted_batch = sorted(self.result_batch,
-                                   key=lambda r: result["timestamp"])
-+            LOG.debug("result_queue.append:\n\t%(sorted_batch)s" % {
-+                "sorted_batch": sorted_batch
-+            })
-             self.result_queue.append(sorted_batch)
-             del self.result_batch[:]
-+        else:
-+            LOG.debug("WAHT DOEST IT MEAN? ")
-     def send_event(self, type, value=None):
-         """Store event to send it to consumer later.
-@@ -254,6 +272,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
-         :param type: Event type
-         :param value: Optional event data
-         """
-+        LOG.debug("send_event:\n\t%(type)s\n\t%(value)s" % {
-+            "type": type, "value": value
-+        })
-         self.event_queue.append({"type": type,
-                                  "value": value})
-diff --git a/rally/task/utils.py b/rally/task/utils.py
-index 1252a1fc7..783adf3c6 100644
---- a/rally/task/utils.py
-+++ b/rally/task/utils.py
-@@ -176,6 +176,9 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
-                     timeout=60, check_interval=1, check_deletion=False,
-                     id_attr="id"):
-+    LOG.debug(
-+        "Waiting for status %(resource)s" % {"resource": resource})
-+
-     resource_repr = getattr(resource, "name", repr(resource))
-     if not isinstance(ready_statuses, (set, list, tuple)):
-         raise ValueError("Ready statuses should be supplied as set, list or "
-@@ -187,7 +190,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
-     # make all statuses upper case
-     ready_statuses = set(s.upper() for s in ready_statuses or [])
-+    LOG.debug("%(resource)s: ready_statuses %(ready_statuses)s" % {
-+        "resource": resource_repr, "ready_statuses": ready_statuses})
-     failure_statuses = set(s.upper() for s in failure_statuses or [])
-+    LOG.debug("%(resource)s: failure_statuses %(failure_statuses)s" % {
-+        "resource": resource_repr, "failure_statuses": failure_statuses})
-     if (ready_statuses & failure_statuses):
-         raise ValueError(
-@@ -205,9 +212,13 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
-     start = time.time()
-     latest_status = get_status(resource, status_attr)
-+    LOG.debug("%(resource)s: latest_status %(latest_status)s" % {
-+        "resource": resource_repr, "latest_status": latest_status})
-     latest_status_update = start
-     while True:
-+        LOG.debug("%(resource)s: timeout %(timeout)s" % {
-+            "resource": resource_repr, "timeout": timeout})
-         try:
-             if id_attr == "id":
-                 resource = update_resource(resource)
-@@ -240,7 +251,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
-                 status=status,
-                 fault="Status in failure list %s" % str(failure_statuses))
+-            process = multiprocessing.Process(target=worker_process,
+-                                              args=next(worker_args_gen),
+-                                              kwargs={"info": kwrgs})
++            process = threading.Thread(target=worker_process,
++                                       args=next(worker_args_gen),
++                                       kwargs={"info": kwrgs})
+             process.start()
+             process_pool.append(process)
  
-+        LOG.debug("%(resource)s: check_interval %(check_interval)s" % {
-+            "resource": resource_repr, "check_interval": check_interval})
-         time.sleep(check_interval)
-+        LOG.debug("%(resource)s: elapsed_time %(elapsed_time)s" % {
-+            "resource": resource_repr, "elapsed_time": time.time() - start})
-         if time.time() - start > timeout:
-             raise exceptions.TimeoutException(
-                 desired_status="('%s')" % "', '".join(ready_statuses),
 -- 
 2.26.2