1 # Copyright 2018: Intel Corporation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 """A runner that runs a configurable number of times before it returns. Each
17 iteration has a configurable timeout. The loop control depends on the
18 feedback received from the running VNFs. The context PIDs from the VNFs
19 to listen the messages from are given in the scenario "setup" method.
23 import multiprocessing
29 from yardstick.benchmark.runners import base
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
37 LOG = logging.getLogger(__name__)
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44 """Endpoint class for ``RunnerIterationIPCConsumer``"""
46 def tg_method_started(self, ctxt, **kwargs):
47 if ctxt['id'] in self._ctx_ids:
50 'action': messaging.TG_METHOD_STARTED,
51 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
55 def tg_method_finished(self, ctxt, **kwargs):
56 if ctxt['id'] in self._ctx_ids:
59 'action': messaging.TG_METHOD_FINISHED,
60 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
63 def tg_method_iteration(self, ctxt, **kwargs):
64 if ctxt['id'] in self._ctx_ids:
67 'action': messaging.TG_METHOD_ITERATION,
68 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73 """MQ consumer for "IterationIPC" runner"""
75 def __init__(self, _id, ctx_ids):
77 self._queue = multiprocessing.Queue()
78 endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79 super(RunnerIterationIPCConsumer, self).__init__(
80 messaging.TOPIC_TG, ctx_ids, endpoints)
81 self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82 self.iteration_index = None
84 def is_all_kpis_received_in_iteration(self):
85 """Check if all producers registered have sent the ITERATION msg
87 During the present iteration, all producers (traffic generators) must
88 start and finish the traffic injection, and at the end of the traffic
89 injection a TG_METHOD_ITERATION must be sent. This function will check
90 all KPIs in the present iteration are received. E.g.:
91 self.iteration_index = 2
94 'ctx1': [kpi0, kpi1, kpi2],
95 'ctx2': [kpi0, kpi1]} --> return False
98 'ctx1': [kpi0, kpi1, kpi2],
99 'ctx2': [kpi0, kpi1, kpi2]} --> return True
101 while not self._queue.empty():
102 msg = self._queue.get(True, 1)
103 if msg['action'] == messaging.TG_METHOD_ITERATION:
104 id_iter_list = self._kpi_per_id[msg['id']]
105 id_iter_list.append(msg['payload'].kpi)
107 return all(len(id_iter_list) == self.iteration_index
108 for id_iter_list in self._kpi_per_id.values())
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112 context_cfg, aborted, output_queue): # pragma: no cover
113 runner_cfg = scenario_cfg['runner']
115 timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116 iterations = runner_cfg.get('iterations', 1)
117 run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118 LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
120 runner_cfg['runner_id'] = os.getpid()
122 benchmark = cls(scenario_cfg, context_cfg)
123 method = getattr(benchmark, method_name)
125 if 'setup' not in run_step:
126 raise exceptions.RunnerIterationIPCSetupActionNeeded()
128 producer_ctxs = benchmark.get_mq_ids()
129 if not producer_ctxs:
130 raise exceptions.RunnerIterationIPCNoCtxs()
132 mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
133 mq_consumer.start_rpc_server()
136 while 'run' in run_step:
137 LOG.debug('runner=%(runner)s seq=%(sequence)s START',
138 {'runner': runner_cfg['runner_id'],
139 'sequence': iteration_index})
143 mq_consumer.iteration_index = iteration_index
146 utils.wait_until_true(
147 mq_consumer.is_all_kpis_received_in_iteration,
148 timeout=timeout, sleep=2)
149 result = method(data)
150 except Exception: # pylint: disable=broad-except
151 errors = traceback.format_exc()
152 LOG.exception(errors)
155 output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
156 benchmark_output = {'timestamp': time.time(),
157 'sequence': iteration_index,
160 queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
162 LOG.debug('runner=%(runner)s seq=%(sequence)s END',
163 {'runner': runner_cfg['runner_id'],
164 'sequence': iteration_index})
167 if iteration_index > iterations or aborted.is_set():
168 LOG.info('"IterationIPC" worker END')
171 if 'teardown' in run_step:
175 LOG.exception('Exception during teardown process')
176 mq_consumer.stop_rpc_server()
179 LOG.debug('Data queue size = %s', queue.qsize())
180 LOG.debug('Output queue size = %s', output_queue.qsize())
181 mq_consumer.stop_rpc_server()
184 class IterationIPCRunner(base.Runner):
185 """Run a scenario for a configurable number of times.
187 Each iteration has a configurable timeout. The loop control depends on the
188 feedback received from the running VNFs. The context PIDs from the VNFs to
189 listen the messages from are given in the scenario "setup" method.
191 __execution_type__ = 'IterationIPC'
193 def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
194 name = '{}-{}-{}'.format(
195 self.__execution_type__, scenario_cfg.get('type'), os.getpid())
196 self.process = multiprocessing.Process(
198 target=_worker_process,
199 args=(self.result_queue, cls, method, scenario_cfg,
200 context_cfg, self.aborted, self.output_queue))