1 # Copyright 2018: Intel Corporation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 """A runner that runs a configurable number of times before it returns. Each
17 iteration has a configurable timeout. The loop control depends on the
18 feedback received from the running VNFs. The context PIDs from the VNFs
19 to listen the messages from are given in the scenario "setup" method.
23 import multiprocessing
29 from yardstick.benchmark.runners import base
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
37 LOG = logging.getLogger(__name__)
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44 """Endpoint class for ``RunnerIterationIPCConsumer``"""
46 def tg_method_started(self, ctxt, **kwargs):
47 if ctxt['id'] in self._ctx_pids:
50 'action': messaging.TG_METHOD_STARTED,
51 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
55 def tg_method_finished(self, ctxt, **kwargs):
56 if ctxt['id'] in self._ctx_pids:
59 'action': messaging.TG_METHOD_FINISHED,
60 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
63 def tg_method_iteration(self, ctxt, **kwargs):
64 if ctxt['id'] in self._ctx_pids:
67 'action': messaging.TG_METHOD_ITERATION,
68 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73 """MQ consumer for "IterationIPC" runner"""
75 def __init__(self, _id, ctx_ids):
77 self._queue = multiprocessing.Queue()
78 endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79 super(RunnerIterationIPCConsumer, self).__init__(
80 messaging.TOPIC_TG, ctx_ids, endpoints)
81 self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82 self.iteration_index = None
84 def is_all_kpis_received_in_iteration(self):
85 """Check if all producers registered have sent the ITERATION msg
87 During the present iteration, all producers (traffic generators) must
88 start and finish the traffic injection, and at the end of the traffic
89 injection a TG_METHOD_ITERATION must be sent. This function will check
90 all KPIs in the present iteration are received. E.g.:
91 self.iteration_index = 2
94 'ctx1': [kpi0, kpi1, kpi2],
95 'ctx2': [kpi0, kpi1]} --> return False
98 'ctx1': [kpi0, kpi1, kpi2],
99 'ctx2': [kpi0, kpi1, kpi2]} --> return True
101 while not self._queue.empty():
102 msg = self._queue.get(True, 1)
103 if msg['action'] == messaging.TG_METHOD_ITERATION:
104 id_iter_list = self._kpi_per_id[msg['id']]
105 id_iter_list.append(msg['payload'].kpi)
107 return all(len(id_iter_list) == self.iteration_index
108 for id_iter_list in self._kpi_per_id.values())
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112 context_cfg, aborted, output_queue): # pragma: no cover
113 runner_cfg = scenario_cfg['runner']
115 timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116 iterations = runner_cfg.get('iterations', 1)
117 run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118 LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
120 runner_cfg['runner_id'] = os.getpid()
122 benchmark = cls(scenario_cfg, context_cfg)
123 method = getattr(benchmark, method_name)
125 if 'setup' not in run_step:
126 raise exceptions.RunnerIterationIPCSetupActionNeeded()
127 producer_ctxs = benchmark.setup()
128 if not producer_ctxs:
129 raise exceptions.RunnerIterationIPCNoCtxs()
131 mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
132 mq_consumer.start_rpc_server()
135 while 'run' in run_step:
136 LOG.debug('runner=%(runner)s seq=%(sequence)s START',
137 {'runner': runner_cfg['runner_id'],
138 'sequence': iteration_index})
142 mq_consumer.iteration_index = iteration_index
145 utils.wait_until_true(
146 mq_consumer.is_all_kpis_received_in_iteration,
147 timeout=timeout, sleep=2)
148 result = method(data)
149 except Exception: # pylint: disable=broad-except
150 errors = traceback.format_exc()
151 LOG.exception(errors)
154 output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
155 benchmark_output = {'timestamp': time.time(),
156 'sequence': iteration_index,
159 queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
161 LOG.debug('runner=%(runner)s seq=%(sequence)s END',
162 {'runner': runner_cfg['runner_id'],
163 'sequence': iteration_index})
166 if iteration_index > iterations or aborted.is_set():
167 LOG.info('"IterationIPC" worker END')
170 if 'teardown' in run_step:
174 LOG.exception('Exception during teardown process')
175 mq_consumer.stop_rpc_server()
178 LOG.debug('Data queue size = %s', queue.qsize())
179 LOG.debug('Output queue size = %s', output_queue.qsize())
180 mq_consumer.stop_rpc_server()
183 class IterationIPCRunner(base.Runner):
184 """Run a scenario for a configurable number of times.
186 Each iteration has a configurable timeout. The loop control depends on the
187 feedback received from the running VNFs. The context PIDs from the VNFs to
188 listen the messages from are given in the scenario "setup" method.
190 __execution_type__ = 'IterationIPC'
192 def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
193 name = '{}-{}-{}'.format(
194 self.__execution_type__, scenario_cfg.get('type'), os.getpid())
195 self.process = multiprocessing.Process(
197 target=_worker_process,
198 args=(self.result_queue, cls, method, scenario_cfg,
199 context_cfg, self.aborted, self.output_queue))