Switch to queue.Queue() in Rally 58/70258/1
authorCédric Ollivier <cedric.ollivier@orange.com>
Wed, 3 Jun 2020 08:06:29 +0000 (10:06 +0200)
committerCédric Ollivier <cedric.ollivier@orange.com>
Wed, 3 Jun 2020 08:08:18 +0000 (10:08 +0200)
It's still failing.

Change-Id: I20fdf5a8e05bccc30a760934f6c0e3cac16d763c
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
(cherry picked from commit 383725f1500047238b39c0dfd51e6ac533094a92)

docker/core/Try-to-detect-the-race-conditions.patch

index 6c9d025..82f60e2 100644 (file)
@@ -1,4 +1,4 @@
-From 41256d0983cda2914948898c5d4a0f74fa161dac Mon Sep 17 00:00:00 2001
+From 41ce1778631894401573161e627a78c2b44182a1 Mon Sep 17 00:00:00 2001
 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
 Date: Thu, 30 Apr 2020 13:59:24 +0200
 Subject: [PATCH] Try to detect the race conditions
@@ -10,13 +10,13 @@ Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198
 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
 ---
  rally/cli/main.py                             |  6 +++
- rally/plugins/task/runners/constant.py        | 38 ++++++++++++++++---
- rally/plugins/task/runners/rps.py             |  4 +-
- .../task/scenarios/requests/http_requests.py  |  9 +++++
- .../plugins/task/scenarios/requests/utils.py  |  9 +++++
- rally/task/runner.py                          | 29 ++++++++++++--
- rally/task/utils.py                           | 15 ++++++++
- 7 files changed, 99 insertions(+), 11 deletions(-)
+ rally/plugins/task/runners/constant.py        | 41 +++++++++++++++----
+ rally/plugins/task/runners/rps.py             |  8 ++--
+ .../task/scenarios/requests/http_requests.py  |  9 ++++
+ .../plugins/task/scenarios/requests/utils.py  |  9 ++++
+ rally/task/runner.py                          | 29 +++++++++++--
+ rally/task/utils.py                           | 15 +++++++
+ 7 files changed, 102 insertions(+), 15 deletions(-)
 
 diff --git a/rally/cli/main.py b/rally/cli/main.py
 index 235a57113..d931924d8 100644
@@ -36,9 +36,18 @@ index 235a57113..d931924d8 100644
  
  from rally.cli import cliutils
 diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py
-index 5feb1fee1..e872b768b 100644
+index 5feb1fee1..2f6142cfa 100644
 --- a/rally/plugins/task/runners/constant.py
 +++ b/rally/plugins/task/runners/constant.py
+@@ -15,7 +15,7 @@
+ import collections
+ import multiprocessing
+-import queue as Queue
++import queue
+ import threading
+ import time
 @@ -24,6 +24,10 @@ from rally.common import validation
  from rally import consts
  from rally.task import runner
@@ -60,6 +69,15 @@ index 5feb1fee1..e872b768b 100644
          if times is not None:
              return iteration < times and not aborted.is_set()
          elif duration is not None:
+@@ -74,7 +81,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
+                             method_name=method_name, args=args)
+     if timeout:
+-        timeout_queue = Queue.Queue()
++        timeout_queue = queue.Queue()
+         collector_thr_by_timeout = threading.Thread(
+             target=utils.timeout_thread,
+             args=(timeout_queue, )
 @@ -82,6 +89,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
          collector_thr_by_timeout.start()
  
@@ -116,8 +134,8 @@ index 5feb1fee1..e872b768b 100644
  
 -        result_queue = multiprocessing.Queue()
 -        event_queue = multiprocessing.Queue()
-+        result_queue = multiprocessing.SimpleQueue()
-+        event_queue = multiprocessing.SimpleQueue()
++        result_queue = queue.Queue()
++        event_queue = queue.Queue()
  
          def worker_args_gen(concurrency_overhead):
              while True:
@@ -127,28 +145,41 @@ index 5feb1fee1..e872b768b 100644
  
 -        result_queue = multiprocessing.Queue()
 -        event_queue = multiprocessing.Queue()
-+        result_queue = multiprocessing.SimpleQueue()
-+        event_queue = multiprocessing.SimpleQueue()
++        result_queue = queue.Queue()
++        event_queue = queue.Queue()
  
          def worker_args_gen(concurrency_overhead):
              while True:
-@@ -340,3 +367,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
-             processes_to_start, _worker_process,
-             worker_args_gen(concurrency_overhead))
-         self._join_processes(process_pool, result_queue, event_queue)
-+
 diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py
-index 98a706d11..74ccfae94 100644
+index 98a706d11..26b55feab 100644
 --- a/rally/plugins/task/runners/rps.py
 +++ b/rally/plugins/task/runners/rps.py
+@@ -15,7 +15,7 @@
+ import collections
+ import multiprocessing
+-import queue as Queue
++import queue
+ import threading
+ import time
+@@ -69,7 +69,7 @@ def _worker_process(queue, iteration_gen, timeout, times, max_concurrent,
+         (sleep * info["processes_counter"]) / info["processes_to_start"])
+     start = time.time()
+-    timeout_queue = Queue.Queue()
++    timeout_queue = queue.Queue()
+     if timeout:
+         collector_thr_by_timeout = threading.Thread(
 @@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner):
                               concurrency_per_worker=concurrency_per_worker,
                               concurrency_overhead=concurrency_overhead)
  
 -        result_queue = multiprocessing.Queue()
 -        event_queue = multiprocessing.Queue()
-+        result_queue = multiprocessing.SimpleQueue()
-+        event_queue = multiprocessing.SimpleQueue()
++        result_queue = queue.Queue()
++        event_queue = queue.Queue()
  
          def worker_args_gen(times_overhead, concurrency_overhead):
              """Generate arguments for process worker.