Switching to Threading instead of multiprocessing
[functest.git] / docker / core / Try-to-detect-the-race-conditions.patch
1 From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001
2 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
3 Date: Wed, 3 Jun 2020 15:23:59 +0200
4 Subject: [PATCH 11/11] Try to detect the race conditions
5 MIME-Version: 1.0
6 Content-Type: text/plain; charset=UTF-8
7 Content-Transfer-Encoding: 8bit
8
9 Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da
10 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
11 ---
12  rally/cli/main.py    |  5 ++++-
13  rally/task/runner.py | 23 ++++++++++++++++++-----
14  2 files changed, 22 insertions(+), 6 deletions(-)
15
16 diff --git a/rally/cli/main.py b/rally/cli/main.py
17 index 235a57113..14c057c0e 100644
18 --- a/rally/cli/main.py
19 +++ b/rally/cli/main.py
20 @@ -15,6 +15,10 @@
21  
22  """CLI interface for Rally."""
23  
24 +STACK_SIZE = 1024 * 1024
25 +import threading
26 +threading.stack_size(STACK_SIZE)
27 +
28  import sys
29  
30  from rally.cli import cliutils
31 @@ -25,7 +29,6 @@ from rally.cli.commands import plugin
32  from rally.cli.commands import task
33  from rally.cli.commands import verify
34  
35 -
36  categories = {
37      "db": db.DBCommands,
38      "env": env.EnvCommands,
39 diff --git a/rally/task/runner.py b/rally/task/runner.py
40 index 3397e1193..b2fde8550 100644
41 --- a/rally/task/runner.py
42 +++ b/rally/task/runner.py
43 @@ -17,6 +17,7 @@ import abc
44  import collections
45  import copy
46  import multiprocessing
47 +import threading
48  import time
49  
50  from rally.common import logging
51 @@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj):
52  def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
53                         event_queue):
54      iteration = context_obj["iteration"]
55 +    LOG.info("DEBUGRACE %s putting in event_queue iteration: %s",
56 +             threading.get_native_id(), iteration)
57      event_queue.put({
58          "type": "iteration",
59          "value": iteration,
60      })
61 +    LOG.info("DEBUGRACE %s put in event_queue iteration: %s",
62 +             threading.get_native_id(), iteration)
63  
64      # provide arguments isolation between iterations
65      scenario_kwargs = copy.deepcopy(scenario_kwargs)
66 @@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
67      scenario_inst = cls(context_obj)
68      error = []
69      try:
70 +        LOG.info("DEBUGRACE %s running %s %s",
71 +                 threading.get_native_id(), scenario_inst, scenario_inst)
72          with rutils.Timer() as timer:
73              getattr(scenario_inst, method_name)(**scenario_kwargs)
74      except Exception as e:
75 @@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
76  
77  def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
78                     event_queue):
79 -    queue.put(_run_scenario_once(cls, method_name, context_obj,
80 -                                 scenario_kwargs, event_queue))
81 +    result = _run_scenario_once(cls, method_name, context_obj,
82 +                                scenario_kwargs, event_queue)
83 +    LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s",
84 +             threading.get_native_id(),
85 +             context_obj)
86 +    queue.put(result)
87 +    LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s",
88 +             threading.get_native_id(), context_obj, result)
89  
90  
91  def _log_worker_info(**info):
92 @@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
93          for i in range(processes_to_start):
94              kwrgs = {"processes_to_start": processes_to_start,
95                       "processes_counter": i}
96 -            process = multiprocessing.Process(target=worker_process,
97 -                                              args=next(worker_args_gen),
98 -                                              kwargs={"info": kwrgs})
99 +            process = threading.Thread(target=worker_process,
100 +                                       args=next(worker_args_gen),
101 +                                       kwargs={"info": kwrgs})
102              process.start()
103              process_pool.append(process)
104  
105 -- 
106 2.26.2
107