Add "IterationIPC" runner
[yardstick.git] / yardstick / benchmark / runners / iteration_ipc.py
1 # Copyright 2018: Intel Corporation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 """A runner that runs a configurable number of times before it returns. Each
17    iteration has a configurable timeout. The loop control depends on the
18    feedback received from the running VNFs. The context PIDs from the VNFs
19    to listen the messages from are given in the scenario "setup" method.
20 """
21
22 import logging
23 import multiprocessing
24 import time
25 import traceback
26
27 import os
28
29 from yardstick.benchmark.runners import base
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
35
36
37 LOG = logging.getLogger(__name__)
38
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
41
42
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44     """Endpoint class for ``RunnerIterationIPCConsumer``"""
45
46     def tg_method_started(self, ctxt, **kwargs):
47         if ctxt['id'] in self._ctx_pids:
48             self._queue.put(
49                 {'id': ctxt['id'],
50                  'action': messaging.TG_METHOD_STARTED,
51                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
52                      kwargs)},
53                 QUEUE_PUT_TIMEOUT)
54
55     def tg_method_finished(self, ctxt, **kwargs):
56         if ctxt['id'] in self._ctx_pids:
57             self._queue.put(
58                 {'id': ctxt['id'],
59                  'action': messaging.TG_METHOD_FINISHED,
60                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
61                      kwargs)})
62
63     def tg_method_iteration(self, ctxt, **kwargs):
64         if ctxt['id'] in self._ctx_pids:
65             self._queue.put(
66                 {'id': ctxt['id'],
67                  'action': messaging.TG_METHOD_ITERATION,
68                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
69                      kwargs)})
70
71
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73     """MQ consumer for "IterationIPC" runner"""
74
75     def __init__(self, _id, ctx_ids):
76         self._id = _id
77         self._queue = multiprocessing.Queue()
78         endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79         super(RunnerIterationIPCConsumer, self).__init__(
80             messaging.TOPIC_TG, ctx_ids, endpoints)
81         self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82         self.iteration_index = None
83
84     def is_all_kpis_received_in_iteration(self):
85         """Check if all producers registered have sent the ITERATION msg
86
87         During the present iteration, all producers (traffic generators) must
88         start and finish the traffic injection, and at the end of the traffic
89         injection a TG_METHOD_ITERATION must be sent. This function will check
90         all KPIs in the present iteration are received. E.g.:
91           self.iteration_index = 2
92
93           self._kpi_per_id = {
94             'ctx1': [kpi0, kpi1, kpi2],
95             'ctx2': [kpi0, kpi1]}          --> return False
96
97           self._kpi_per_id = {
98             'ctx1': [kpi0, kpi1, kpi2],
99             'ctx2': [kpi0, kpi1, kpi2]}    --> return True
100         """
101         while not self._queue.empty():
102             msg = self._queue.get(True, 1)
103             if msg['action'] == messaging.TG_METHOD_ITERATION:
104                 id_iter_list = self._kpi_per_id[msg['id']]
105                 id_iter_list.append(msg['payload'].kpi)
106
107         return all(len(id_iter_list) == self.iteration_index
108                    for id_iter_list in self._kpi_per_id.values())
109
110
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112                     context_cfg, aborted, output_queue):  # pragma: no cover
113     runner_cfg = scenario_cfg['runner']
114
115     timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116     iterations = runner_cfg.get('iterations', 1)
117     run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118     LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
119
120     runner_cfg['runner_id'] = os.getpid()
121
122     benchmark = cls(scenario_cfg, context_cfg)
123     method = getattr(benchmark, method_name)
124
125     if 'setup' not in run_step:
126         raise exceptions.RunnerIterationIPCSetupActionNeeded()
127     producer_ctxs = benchmark.setup()
128     if not producer_ctxs:
129         raise exceptions.RunnerIterationIPCNoCtxs()
130
131     mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
132     mq_consumer.start_rpc_server()
133
134     iteration_index = 1
135     while 'run' in run_step:
136         LOG.debug('runner=%(runner)s seq=%(sequence)s START',
137                   {'runner': runner_cfg['runner_id'],
138                    'sequence': iteration_index})
139         data = {}
140         result = None
141         errors = ''
142         mq_consumer.iteration_index = iteration_index
143
144         try:
145             utils.wait_until_true(
146                 mq_consumer.is_all_kpis_received_in_iteration,
147                 timeout=timeout, sleep=2)
148             result = method(data)
149         except Exception:  # pylint: disable=broad-except
150             errors = traceback.format_exc()
151             LOG.exception(errors)
152
153         if result:
154             output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
155         benchmark_output = {'timestamp': time.time(),
156                             'sequence': iteration_index,
157                             'data': data,
158                             'errors': errors}
159         queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
160
161         LOG.debug('runner=%(runner)s seq=%(sequence)s END',
162                   {'runner': runner_cfg['runner_id'],
163                    'sequence': iteration_index})
164
165         iteration_index += 1
166         if iteration_index > iterations or aborted.is_set():
167             LOG.info('"IterationIPC" worker END')
168             break
169
170     if 'teardown' in run_step:
171         try:
172             benchmark.teardown()
173         except Exception:
174             LOG.exception('Exception during teardown process')
175             mq_consumer.stop_rpc_server()
176             raise SystemExit(1)
177
178     LOG.debug('Data queue size = %s', queue.qsize())
179     LOG.debug('Output queue size = %s', output_queue.qsize())
180     mq_consumer.stop_rpc_server()
181
182
183 class IterationIPCRunner(base.Runner):
184     """Run a scenario for a configurable number of times.
185
186     Each iteration has a configurable timeout. The loop control depends on the
187     feedback received from the running VNFs. The context PIDs from the VNFs to
188     listen the messages from are given in the scenario "setup" method.
189     """
190     __execution_type__ = 'IterationIPC'
191
192     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
193         name = '{}-{}-{}'.format(
194             self.__execution_type__, scenario_cfg.get('type'), os.getpid())
195         self.process = multiprocessing.Process(
196             name=name,
197             target=_worker_process,
198             args=(self.result_queue, cls, method, scenario_cfg,
199                   context_cfg, self.aborted, self.output_queue))
200         self.process.start()