Switch to SimpleQueue() 49/70249/1
authorCédric Ollivier <cedric.ollivier@orange.com>
Mon, 1 Jun 2020 15:06:29 +0000 (17:06 +0200)
committerCédric Ollivier <cedric.ollivier@orange.com>
Mon, 1 Jun 2020 15:07:05 +0000 (17:07 +0200)
Change-Id: I1f3cb636813b9c3c10b5c829e35bbdeea02a318c
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
(cherry picked from commit 265cc0dfdeb9f65f8641fc5b60900b6ee1315f5a)

docker/core/Try-to-detect-the-race-conditions.patch

index 50d0344..6c9d025 100644 (file)
@@ -1,4 +1,4 @@
-From 92cf158d8932f4509983b3813049be717093253e Mon Sep 17 00:00:00 2001
+From 41256d0983cda2914948898c5d4a0f74fa161dac Mon Sep 17 00:00:00 2001
 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
 Date: Thu, 30 Apr 2020 13:59:24 +0200
 Subject: [PATCH] Try to detect the race conditions
@@ -9,13 +9,14 @@ Content-Transfer-Encoding: 8bit
 Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198
 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
 ---
- rally/cli/main.py                             |  6 ++++
- rally/plugins/task/runners/constant.py        | 30 ++++++++++++++++++-
- .../task/scenarios/requests/http_requests.py  |  9 ++++++
- .../plugins/task/scenarios/requests/utils.py  |  9 ++++++
- rally/task/runner.py                          | 27 +++++++++++++++--
- rally/task/utils.py                           | 15 ++++++++++
- 6 files changed, 93 insertions(+), 3 deletions(-)
+ rally/cli/main.py                             |  6 +++
+ rally/plugins/task/runners/constant.py        | 38 ++++++++++++++++---
+ rally/plugins/task/runners/rps.py             |  4 +-
+ .../task/scenarios/requests/http_requests.py  |  9 +++++
+ .../plugins/task/scenarios/requests/utils.py  |  9 +++++
+ rally/task/runner.py                          | 29 ++++++++++++--
+ rally/task/utils.py                           | 15 ++++++++
+ 7 files changed, 99 insertions(+), 11 deletions(-)
 
 diff --git a/rally/cli/main.py b/rally/cli/main.py
 index 235a57113..d931924d8 100644
@@ -35,7 +36,7 @@ index 235a57113..d931924d8 100644
  
  from rally.cli import cliutils
 diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py
-index 5feb1fee1..38a01e28e 100644
+index 5feb1fee1..e872b768b 100644
 --- a/rally/plugins/task/runners/constant.py
 +++ b/rally/plugins/task/runners/constant.py
 @@ -24,6 +24,10 @@ from rally.common import validation
@@ -109,11 +110,48 @@ index 5feb1fee1..38a01e28e 100644
  
      if timeout:
          timeout_queue.put((None, None,))
+@@ -229,8 +256,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
+                              concurrency_per_worker=concurrency_per_worker,
+                              concurrency_overhead=concurrency_overhead)
+-        result_queue = multiprocessing.Queue()
+-        event_queue = multiprocessing.Queue()
++        result_queue = multiprocessing.SimpleQueue()
++        event_queue = multiprocessing.SimpleQueue()
+         def worker_args_gen(concurrency_overhead):
+             while True:
+@@ -324,8 +351,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
+                              concurrency_per_worker=concurrency_per_worker,
+                              concurrency_overhead=concurrency_overhead)
+-        result_queue = multiprocessing.Queue()
+-        event_queue = multiprocessing.Queue()
++        result_queue = multiprocessing.SimpleQueue()
++        event_queue = multiprocessing.SimpleQueue()
+         def worker_args_gen(concurrency_overhead):
+             while True:
 @@ -340,3 +367,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
              processes_to_start, _worker_process,
              worker_args_gen(concurrency_overhead))
          self._join_processes(process_pool, result_queue, event_queue)
 +
+diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py
+index 98a706d11..74ccfae94 100644
+--- a/rally/plugins/task/runners/rps.py
++++ b/rally/plugins/task/runners/rps.py
+@@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner):
+                              concurrency_per_worker=concurrency_per_worker,
+                              concurrency_overhead=concurrency_overhead)
+-        result_queue = multiprocessing.Queue()
+-        event_queue = multiprocessing.Queue()
++        result_queue = multiprocessing.SimpleQueue()
++        event_queue = multiprocessing.SimpleQueue()
+         def worker_args_gen(times_overhead, concurrency_overhead):
+             """Generate arguments for process worker.
 diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py
 index e85ee5af2..4afdf29f1 100644
 --- a/rally/plugins/task/scenarios/requests/http_requests.py
@@ -169,7 +207,7 @@ index 8fd35347a..cd69900a5 100644
          if status_code != resp.status_code:
              error_msg = "Expected HTTP request code is `%s` actual `%s`"
 diff --git a/rally/task/runner.py b/rally/task/runner.py
-index 3397e1193..57f9428f4 100644
+index 3397e1193..295558f44 100644
 --- a/rally/task/runner.py
 +++ b/rally/task/runner.py
 @@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
@@ -193,7 +231,7 @@ index 3397e1193..57f9428f4 100644
              process = multiprocessing.Process(target=worker_process,
                                                args=next(worker_args_gen),
                                                kwargs={"info": kwrgs})
-@@ -202,18 +209,26 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
+@@ -202,22 +209,28 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
          :param event_queue: multiprocessing.Queue that receives the events
          """
          while process_pool:
@@ -221,8 +259,12 @@ index 3397e1193..57f9428f4 100644
 +                self._send_result(col_result_queue)
  
          self._flush_results()
-         result_queue.close()
-@@ -245,8 +260,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
+-        result_queue.close()
+-        event_queue.close()
+     def _flush_results(self):
+         if self.result_batch:
+@@ -245,8 +258,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
          if len(self.result_batch) >= self.batch_size:
              sorted_batch = sorted(self.result_batch,
                                    key=lambda r: result["timestamp"])
@@ -236,7 +278,7 @@ index 3397e1193..57f9428f4 100644
  
      def send_event(self, type, value=None):
          """Store event to send it to consumer later.
-@@ -254,6 +274,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
+@@ -254,6 +272,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
          :param type: Event type
          :param value: Optional event data
          """