Merge "Separate out test_parse_to_value_exception()"
[yardstick.git] / yardstick / benchmark / runners / iteration_ipc.py
1 # Copyright 2018: Intel Corporation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 """A runner that runs a configurable number of times before it returns. Each
17    iteration has a configurable timeout. The loop control depends on the
18    feedback received from the running VNFs. The context PIDs from the VNFs
19    to listen the messages from are given in the scenario "setup" method.
20 """
21
22 import logging
23 import multiprocessing
24 import time
25 import traceback
26
27 import os
28
29 from yardstick.benchmark.runners import base as base_runner
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
35
36
37 LOG = logging.getLogger(__name__)
38
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
41
42
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44     """Endpoint class for ``RunnerIterationIPCConsumer``"""
45
46     def tg_method_started(self, ctxt, **kwargs):
47         if ctxt['id'] in self._ctx_ids:
48             self._queue.put(
49                 {'id': ctxt['id'],
50                  'action': messaging.TG_METHOD_STARTED,
51                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
52                      kwargs)},
53                 QUEUE_PUT_TIMEOUT)
54
55     def tg_method_finished(self, ctxt, **kwargs):
56         if ctxt['id'] in self._ctx_ids:
57             self._queue.put(
58                 {'id': ctxt['id'],
59                  'action': messaging.TG_METHOD_FINISHED,
60                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
61                      kwargs)})
62
63     def tg_method_iteration(self, ctxt, **kwargs):
64         if ctxt['id'] in self._ctx_ids:
65             self._queue.put(
66                 {'id': ctxt['id'],
67                  'action': messaging.TG_METHOD_ITERATION,
68                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
69                      kwargs)})
70
71
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73     """MQ consumer for "IterationIPC" runner"""
74
75     def __init__(self, _id, ctx_ids):
76         self._id = _id
77         self._queue = multiprocessing.Queue()
78         endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79         super(RunnerIterationIPCConsumer, self).__init__(
80             messaging.TOPIC_TG, ctx_ids, endpoints)
81         self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82         self.iteration_index = None
83
84     def is_all_kpis_received_in_iteration(self):
85         """Check if all producers registered have sent the ITERATION msg
86
87         During the present iteration, all producers (traffic generators) must
88         start and finish the traffic injection, and at the end of the traffic
89         injection a TG_METHOD_ITERATION must be sent. This function will check
90         all KPIs in the present iteration are received. E.g.:
91           self.iteration_index = 2
92
93           self._kpi_per_id = {
94             'ctx1': [kpi0, kpi1, kpi2],
95             'ctx2': [kpi0, kpi1]}          --> return False
96
97           self._kpi_per_id = {
98             'ctx1': [kpi0, kpi1, kpi2],
99             'ctx2': [kpi0, kpi1, kpi2]}    --> return True
100         """
101         while not self._queue.empty():
102             msg = self._queue.get(True, 1)
103             if msg['action'] == messaging.TG_METHOD_ITERATION:
104                 id_iter_list = self._kpi_per_id[msg['id']]
105                 id_iter_list.append(msg['payload'].kpi)
106
107         return all(len(id_iter_list) == self.iteration_index
108                    for id_iter_list in self._kpi_per_id.values())
109
110
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112                     context_cfg, aborted, output_queue):  # pragma: no cover
113     runner_cfg = scenario_cfg['runner']
114
115     timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116     iterations = runner_cfg.get('iterations', 1)
117     run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118     LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
119
120     runner_cfg['runner_id'] = os.getpid()
121
122     benchmark = cls(scenario_cfg, context_cfg)
123     method = getattr(benchmark, method_name)
124
125     if 'setup' not in run_step:
126         raise exceptions.RunnerIterationIPCSetupActionNeeded()
127     benchmark.setup()
128     producer_ctxs = benchmark.get_mq_ids()
129     if not producer_ctxs:
130         raise exceptions.RunnerIterationIPCNoCtxs()
131
132     mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
133     mq_consumer.start_rpc_server()
134     mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
135
136     iteration_index = 1
137     while 'run' in run_step:
138         LOG.debug('runner=%(runner)s seq=%(sequence)s START',
139                   {'runner': runner_cfg['runner_id'],
140                    'sequence': iteration_index})
141         data = {}
142         result = None
143         errors = ''
144         mq_consumer.iteration_index = iteration_index
145         mq_producer.start_iteration()
146
147         try:
148             utils.wait_until_true(
149                 mq_consumer.is_all_kpis_received_in_iteration,
150                 timeout=timeout, sleep=2)
151             result = method(data)
152         except Exception:  # pylint: disable=broad-except
153             errors = traceback.format_exc()
154             LOG.exception(errors)
155
156         mq_producer.stop_iteration()
157
158         if result:
159             output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
160         benchmark_output = {'timestamp': time.time(),
161                             'sequence': iteration_index,
162                             'data': data,
163                             'errors': errors}
164         queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
165
166         LOG.debug('runner=%(runner)s seq=%(sequence)s END',
167                   {'runner': runner_cfg['runner_id'],
168                    'sequence': iteration_index})
169
170         iteration_index += 1
171         if iteration_index > iterations or aborted.is_set():
172             LOG.info('"IterationIPC" worker END')
173             break
174
175     if 'teardown' in run_step:
176         try:
177             benchmark.teardown()
178         except Exception:
179             LOG.exception('Exception during teardown process')
180             mq_consumer.stop_rpc_server()
181             raise SystemExit(1)
182
183     LOG.debug('Data queue size = %s', queue.qsize())
184     LOG.debug('Output queue size = %s', output_queue.qsize())
185     mq_consumer.stop_rpc_server()
186
187
188 class IterationIPCRunner(base_runner.Runner):
189     """Run a scenario for a configurable number of times.
190
191     Each iteration has a configurable timeout. The loop control depends on the
192     feedback received from the running VNFs. The context PIDs from the VNFs to
193     listen the messages from are given in the scenario "setup" method.
194     """
195     __execution_type__ = 'IterationIPC'
196
197     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
198         name = '{}-{}-{}'.format(
199             self.__execution_type__, scenario_cfg.get('type'), os.getpid())
200         self.process = multiprocessing.Process(
201             name=name,
202             target=_worker_process,
203             args=(self.result_queue, cls, method, scenario_cfg,
204                   context_cfg, self.aborted, self.output_queue))
205         self.process.start()