1 From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001
2 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
3 Date: Wed, 3 Jun 2020 15:23:59 +0200
4 Subject: [PATCH 11/11] Try to detect the race conditions
6 Content-Type: text/plain; charset=UTF-8
7 Content-Transfer-Encoding: 8bit
9 Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da
10 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
12 rally/cli/main.py | 5 ++++-
13 rally/task/runner.py | 23 ++++++++++++++++++-----
14 2 files changed, 22 insertions(+), 6 deletions(-)
16 diff --git a/rally/cli/main.py b/rally/cli/main.py
17 index 235a57113..14c057c0e 100644
18 --- a/rally/cli/main.py
19 +++ b/rally/cli/main.py
22 """CLI interface for Rally."""
24 +STACK_SIZE = 1024 * 1024
26 +threading.stack_size(STACK_SIZE)
30 from rally.cli import cliutils
31 @@ -25,7 +29,6 @@ from rally.cli.commands import plugin
32 from rally.cli.commands import task
33 from rally.cli.commands import verify
38 "env": env.EnvCommands,
39 diff --git a/rally/task/runner.py b/rally/task/runner.py
40 index 3397e1193..b2fde8550 100644
41 --- a/rally/task/runner.py
42 +++ b/rally/task/runner.py
43 @@ -17,6 +17,7 @@ import abc
46 import multiprocessing
50 from rally.common import logging
51 @@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj):
52 def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
54 iteration = context_obj["iteration"]
55 + LOG.info("DEBUGRACE %s putting in event_queue iteration: %s",
56 + threading.get_native_id(), iteration)
61 + LOG.info("DEBUGRACE %s put in event_queue iteration: %s",
62 + threading.get_native_id(), iteration)
64 # provide arguments isolation between iterations
65 scenario_kwargs = copy.deepcopy(scenario_kwargs)
66 @@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
67 scenario_inst = cls(context_obj)
70 + LOG.info("DEBUGRACE %s running %s %s",
71 + threading.get_native_id(), scenario_inst, scenario_inst)
72 with rutils.Timer() as timer:
73 getattr(scenario_inst, method_name)(**scenario_kwargs)
74 except Exception as e:
75 @@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
77 def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
79 - queue.put(_run_scenario_once(cls, method_name, context_obj,
80 - scenario_kwargs, event_queue))
81 + result = _run_scenario_once(cls, method_name, context_obj,
82 + scenario_kwargs, event_queue)
83 + LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s",
84 + threading.get_native_id(),
87 + LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s",
88 + threading.get_native_id(), context_obj, result)
91 def _log_worker_info(**info):
92 @@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
93 for i in range(processes_to_start):
94 kwrgs = {"processes_to_start": processes_to_start,
95 "processes_counter": i}
96 - process = multiprocessing.Process(target=worker_process,
97 - args=next(worker_args_gen),
98 - kwargs={"info": kwrgs})
99 + process = threading.Thread(target=worker_process,
100 + args=next(worker_args_gen),
101 + kwargs={"info": kwrgs})
103 process_pool.append(process)