1 # Copyright 2018: Intel Corporation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 """A runner that runs a configurable number of times before it returns. Each
17 iteration has a configurable timeout. The loop control depends on the
18 feedback received from the running VNFs. The context PIDs from the VNFs
19 to listen the messages from are given in the scenario "setup" method.
23 import multiprocessing
29 from yardstick.benchmark.runners import base as base_runner
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
37 LOG = logging.getLogger(__name__)
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44 """Endpoint class for ``RunnerIterationIPCConsumer``"""
46 def tg_method_started(self, ctxt, **kwargs):
47 if ctxt['id'] in self._ctx_ids:
50 'action': messaging.TG_METHOD_STARTED,
51 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
55 def tg_method_finished(self, ctxt, **kwargs):
56 if ctxt['id'] in self._ctx_ids:
59 'action': messaging.TG_METHOD_FINISHED,
60 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
63 def tg_method_iteration(self, ctxt, **kwargs):
64 if ctxt['id'] in self._ctx_ids:
67 'action': messaging.TG_METHOD_ITERATION,
68 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73 """MQ consumer for "IterationIPC" runner"""
75 def __init__(self, _id, ctx_ids):
77 self._queue = multiprocessing.Queue()
78 endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79 super(RunnerIterationIPCConsumer, self).__init__(
80 messaging.TOPIC_TG, ctx_ids, endpoints)
81 self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82 self.iteration_index = None
84 def is_all_kpis_received_in_iteration(self):
85 """Check if all producers registered have sent the ITERATION msg
87 During the present iteration, all producers (traffic generators) must
88 start and finish the traffic injection, and at the end of the traffic
89 injection a TG_METHOD_ITERATION must be sent. This function will check
90 all KPIs in the present iteration are received. E.g.:
91 self.iteration_index = 2
94 'ctx1': [kpi0, kpi1, kpi2],
95 'ctx2': [kpi0, kpi1]} --> return False
98 'ctx1': [kpi0, kpi1, kpi2],
99 'ctx2': [kpi0, kpi1, kpi2]} --> return True
101 while not self._queue.empty():
102 msg = self._queue.get(True, 1)
103 if msg['action'] == messaging.TG_METHOD_ITERATION:
104 id_iter_list = self._kpi_per_id[msg['id']]
105 id_iter_list.append(msg['payload'].kpi)
107 return all(len(id_iter_list) == self.iteration_index
108 for id_iter_list in self._kpi_per_id.values())
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112 context_cfg, aborted, output_queue): # pragma: no cover
113 runner_cfg = scenario_cfg['runner']
115 timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116 iterations = runner_cfg.get('iterations', 1)
117 run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118 LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
120 runner_cfg['runner_id'] = os.getpid()
122 benchmark = cls(scenario_cfg, context_cfg)
123 method = getattr(benchmark, method_name)
125 if 'setup' not in run_step:
126 raise exceptions.RunnerIterationIPCSetupActionNeeded()
128 producer_ctxs = benchmark.get_mq_ids()
129 if not producer_ctxs:
130 raise exceptions.RunnerIterationIPCNoCtxs()
132 mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
133 mq_consumer.start_rpc_server()
134 mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
137 while 'run' in run_step:
138 LOG.debug('runner=%(runner)s seq=%(sequence)s START',
139 {'runner': runner_cfg['runner_id'],
140 'sequence': iteration_index})
144 mq_consumer.iteration_index = iteration_index
145 mq_producer.start_iteration()
148 utils.wait_until_true(
149 mq_consumer.is_all_kpis_received_in_iteration,
150 timeout=timeout, sleep=2)
151 result = method(data)
152 except Exception: # pylint: disable=broad-except
153 errors = traceback.format_exc()
154 LOG.exception(errors)
156 mq_producer.stop_iteration()
159 output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
160 benchmark_output = {'timestamp': time.time(),
161 'sequence': iteration_index,
164 queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
166 LOG.debug('runner=%(runner)s seq=%(sequence)s END',
167 {'runner': runner_cfg['runner_id'],
168 'sequence': iteration_index})
171 if iteration_index > iterations or aborted.is_set():
172 LOG.info('"IterationIPC" worker END')
175 if 'teardown' in run_step:
179 LOG.exception('Exception during teardown process')
180 mq_consumer.stop_rpc_server()
183 LOG.debug('Data queue size = %s', queue.qsize())
184 LOG.debug('Output queue size = %s', output_queue.qsize())
185 mq_consumer.stop_rpc_server()
188 class IterationIPCRunner(base_runner.Runner):
189 """Run a scenario for a configurable number of times.
191 Each iteration has a configurable timeout. The loop control depends on the
192 feedback received from the running VNFs. The context PIDs from the VNFs to
193 listen the messages from are given in the scenario "setup" method.
195 __execution_type__ = 'IterationIPC'
197 def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
198 name = '{}-{}-{}'.format(
199 self.__execution_type__, scenario_cfg.get('type'), os.getpid())
200 self.process = multiprocessing.Process(
202 target=_worker_process,
203 args=(self.result_queue, cls, method, scenario_cfg,
204 context_cfg, self.aborted, self.output_queue))