# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
import importlib
import logging
import multiprocessing
import time
import traceback
-from oslo_config import cfg
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
-log = logging.getLogger(__name__)
-
-CONF = cfg.CONF
-
-
-def _output_serializer_main(filename, queue, config):
- """entrypoint for the singleton subprocess writing to outfile
- Use of this process enables multiple instances of a scenario without
- messing up the output file.
- """
- out_type = config['yardstick'].get('DEFAULT', {}).get('dispatcher', 'file')
- conf = {
- 'type': out_type.capitalize(),
- 'file_path': filename
- }
- dispatcher = DispatcherBase.get(conf, config)
-
- while True:
- # blocks until data becomes available
- record = queue.get()
- if record == '_TERMINATE_':
- dispatcher.flush_result_data()
- break
- else:
- dispatcher.record_result_data(record)
+log = logging.getLogger(__name__)
def _execute_shell_command(command):
"""execute shell script with error handling"""
exitcode = 0
- output = []
try:
output = subprocess.check_output(command, shell=True)
- except Exception:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
class Runner(object):
- queue = None
- dump_process = None
runners = []
@staticmethod
return types
@staticmethod
- def get(runner_cfg, config):
+ def get(runner_cfg):
"""Returns instance of a scenario runner for execution type.
"""
- # if there is no runner, start the output serializer subprocess
- if not Runner.runners:
- log.debug("Starting dump process file '%s'",
- runner_cfg["output_filename"])
- Runner.queue = multiprocessing.Queue()
- Runner.dump_process = multiprocessing.Process(
- target=_output_serializer_main,
- name="Dumper",
- args=(runner_cfg["output_filename"], Runner.queue, config))
- Runner.dump_process.start()
-
- return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue)
-
- @staticmethod
- def release_dump_process():
- """Release the dumper process"""
- log.debug("Stopping dump process")
- if Runner.dump_process:
- Runner.queue.put('_TERMINATE_')
- Runner.dump_process.join()
- Runner.dump_process = None
+ return Runner.get_cls(runner_cfg["type"])(runner_cfg)
@staticmethod
def release(runner):
if runner in Runner.runners:
Runner.runners.remove(runner)
- # if this was the last runner, stop the output serializer subprocess
- if not Runner.runners:
- Runner.release_dump_process()
-
@staticmethod
def terminate(runner):
"""Terminate the runner"""
# release dumper process as some errors before any runner is created
if not Runner.runners:
- Runner.release_dump_process()
return
for runner in Runner.runners:
runner.periodic_action_process = None
Runner.release(runner)
- def __init__(self, config, queue):
+ def __init__(self, config):
+ self.task_id = None
+ self.case_name = None
self.config = config
self.periodic_action_process = None
- self.result_queue = queue
+ self.output_queue = multiprocessing.Queue()
+ self.result_queue = multiprocessing.Queue()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
log.debug("post-stop data: \n%s", data)
self.result_queue.put({'post-stop-action-data': data})
+ def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
+ raise NotImplementedError
+
def run(self, scenario_cfg, context_cfg):
scenario_type = scenario_cfg["type"]
class_name = base_scenario.Scenario.get(scenario_type)
cls = getattr(module, path_split[-1])
self.config['object'] = class_name
+ self.case_name = scenario_cfg['tc']
+ self.task_id = scenario_cfg['task_id']
self.aborted.clear()
# run a potentially configured pre-start action
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
+ QUEUE_JOIN_INTERVAL = 5
+
+ def poll(self, timeout=QUEUE_JOIN_INTERVAL):
self.process.join(timeout)
+ return self.process.exitcode
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
self.run_post_stop_action()
return self.process.exitcode
+
+ def get_output(self):
+ result = {}
+ while not self.output_queue.empty():
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except moves.queue.Empty:
+ pass
+ return result
+
+ def get_result(self):
+ result = []
+
+ dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
+ output_in_influxdb = 'influxdb' in dispatcher
+
+ while not self.result_queue.empty():
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ one_record = self.result_queue.get(True, 1)
+ except moves.queue.Empty:
+ pass
+ else:
+ if output_in_influxdb:
+ self._output_to_influxdb(one_record)
+
+ result.append(one_record)
+ return result
+
+ def _output_to_influxdb(self, record):
+ dispatchers = DispatcherBase.get(self.config['output_config'])
+ dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
+ dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+ """Class implementing the message producer for runners"""
+
+ def __init__(self, _id):
+ super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+ def start_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_START_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
+
+ def stop_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_STOP_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))