Enable traffic generator PID in "NSPerf" scenario setup
[yardstick.git] / yardstick / benchmark / runners / iteration_ipc.py
1 # Copyright 2018: Intel Corporation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 """A runner that runs a configurable number of times before it returns. Each
17    iteration has a configurable timeout. The loop control depends on the
18    feedback received from the running VNFs. The context PIDs from the VNFs
19    to listen the messages from are given in the scenario "setup" method.
20 """
21
22 import logging
23 import multiprocessing
24 import time
25 import traceback
26
27 import os
28
29 from yardstick.benchmark.runners import base
30 from yardstick.common import exceptions
31 from yardstick.common import messaging
32 from yardstick.common import utils
33 from yardstick.common.messaging import consumer
34 from yardstick.common.messaging import payloads
35
36
37 LOG = logging.getLogger(__name__)
38
39 QUEUE_PUT_TIMEOUT = 10
40 ITERATION_TIMEOUT = 180
41
42
43 class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
44     """Endpoint class for ``RunnerIterationIPCConsumer``"""
45
46     def tg_method_started(self, ctxt, **kwargs):
47         if ctxt['id'] in self._ctx_ids:
48             self._queue.put(
49                 {'id': ctxt['id'],
50                  'action': messaging.TG_METHOD_STARTED,
51                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
52                      kwargs)},
53                 QUEUE_PUT_TIMEOUT)
54
55     def tg_method_finished(self, ctxt, **kwargs):
56         if ctxt['id'] in self._ctx_ids:
57             self._queue.put(
58                 {'id': ctxt['id'],
59                  'action': messaging.TG_METHOD_FINISHED,
60                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
61                      kwargs)})
62
63     def tg_method_iteration(self, ctxt, **kwargs):
64         if ctxt['id'] in self._ctx_ids:
65             self._queue.put(
66                 {'id': ctxt['id'],
67                  'action': messaging.TG_METHOD_ITERATION,
68                  'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
69                      kwargs)})
70
71
72 class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
73     """MQ consumer for "IterationIPC" runner"""
74
75     def __init__(self, _id, ctx_ids):
76         self._id = _id
77         self._queue = multiprocessing.Queue()
78         endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
79         super(RunnerIterationIPCConsumer, self).__init__(
80             messaging.TOPIC_TG, ctx_ids, endpoints)
81         self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
82         self.iteration_index = None
83
84     def is_all_kpis_received_in_iteration(self):
85         """Check if all producers registered have sent the ITERATION msg
86
87         During the present iteration, all producers (traffic generators) must
88         start and finish the traffic injection, and at the end of the traffic
89         injection a TG_METHOD_ITERATION must be sent. This function will check
90         all KPIs in the present iteration are received. E.g.:
91           self.iteration_index = 2
92
93           self._kpi_per_id = {
94             'ctx1': [kpi0, kpi1, kpi2],
95             'ctx2': [kpi0, kpi1]}          --> return False
96
97           self._kpi_per_id = {
98             'ctx1': [kpi0, kpi1, kpi2],
99             'ctx2': [kpi0, kpi1, kpi2]}    --> return True
100         """
101         while not self._queue.empty():
102             msg = self._queue.get(True, 1)
103             if msg['action'] == messaging.TG_METHOD_ITERATION:
104                 id_iter_list = self._kpi_per_id[msg['id']]
105                 id_iter_list.append(msg['payload'].kpi)
106
107         return all(len(id_iter_list) == self.iteration_index
108                    for id_iter_list in self._kpi_per_id.values())
109
110
111 def _worker_process(queue, cls, method_name, scenario_cfg,
112                     context_cfg, aborted, output_queue):  # pragma: no cover
113     runner_cfg = scenario_cfg['runner']
114
115     timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
116     iterations = runner_cfg.get('iterations', 1)
117     run_step = runner_cfg.get('run_step', 'setup,run,teardown')
118     LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
119
120     runner_cfg['runner_id'] = os.getpid()
121
122     benchmark = cls(scenario_cfg, context_cfg)
123     method = getattr(benchmark, method_name)
124
125     if 'setup' not in run_step:
126         raise exceptions.RunnerIterationIPCSetupActionNeeded()
127     benchmark.setup()
128     producer_ctxs = benchmark.get_mq_ids()
129     if not producer_ctxs:
130         raise exceptions.RunnerIterationIPCNoCtxs()
131
132     mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
133     mq_consumer.start_rpc_server()
134
135     iteration_index = 1
136     while 'run' in run_step:
137         LOG.debug('runner=%(runner)s seq=%(sequence)s START',
138                   {'runner': runner_cfg['runner_id'],
139                    'sequence': iteration_index})
140         data = {}
141         result = None
142         errors = ''
143         mq_consumer.iteration_index = iteration_index
144
145         try:
146             utils.wait_until_true(
147                 mq_consumer.is_all_kpis_received_in_iteration,
148                 timeout=timeout, sleep=2)
149             result = method(data)
150         except Exception:  # pylint: disable=broad-except
151             errors = traceback.format_exc()
152             LOG.exception(errors)
153
154         if result:
155             output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
156         benchmark_output = {'timestamp': time.time(),
157                             'sequence': iteration_index,
158                             'data': data,
159                             'errors': errors}
160         queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
161
162         LOG.debug('runner=%(runner)s seq=%(sequence)s END',
163                   {'runner': runner_cfg['runner_id'],
164                    'sequence': iteration_index})
165
166         iteration_index += 1
167         if iteration_index > iterations or aborted.is_set():
168             LOG.info('"IterationIPC" worker END')
169             break
170
171     if 'teardown' in run_step:
172         try:
173             benchmark.teardown()
174         except Exception:
175             LOG.exception('Exception during teardown process')
176             mq_consumer.stop_rpc_server()
177             raise SystemExit(1)
178
179     LOG.debug('Data queue size = %s', queue.qsize())
180     LOG.debug('Output queue size = %s', output_queue.qsize())
181     mq_consumer.stop_rpc_server()
182
183
184 class IterationIPCRunner(base.Runner):
185     """Run a scenario for a configurable number of times.
186
187     Each iteration has a configurable timeout. The loop control depends on the
188     feedback received from the running VNFs. The context PIDs from the VNFs to
189     listen the messages from are given in the scenario "setup" method.
190     """
191     __execution_type__ = 'IterationIPC'
192
193     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
194         name = '{}-{}-{}'.format(
195             self.__execution_type__, scenario_cfg.get('type'), os.getpid())
196         self.process = multiprocessing.Process(
197             name=name,
198             target=_worker_process,
199             args=(self.result_queue, cls, method, scenario_cfg,
200                   context_cfg, self.aborted, self.output_queue))
201         self.process.start()