Merge "Update release note for Danube.3.2"
[yardstick.git] / yardstick / benchmark / runners / search.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that runs a specific time before it returns
20 """
21
22 from __future__ import absolute_import
23 import os
24 import multiprocessing
25 import logging
26 import traceback
27 import time
28
29 from collections import Mapping
30 from contextlib import contextmanager
31 from itertools import takewhile
32 from six.moves import zip
33
34 from yardstick.benchmark.runners import base
35
36 LOG = logging.getLogger(__name__)
37
38
39 class SearchRunnerHelper(object):
40
41     def __init__(self, cls, method_name, scenario_cfg, context_cfg, aborted):
42         super(SearchRunnerHelper, self).__init__()
43         self.cls = cls
44         self.method_name = method_name
45         self.scenario_cfg = scenario_cfg
46         self.context_cfg = context_cfg
47         self.aborted = aborted
48         self.runner_cfg = scenario_cfg['runner']
49         self.run_step = self.runner_cfg.get("run_step", "setup,run,teardown")
50         self.timeout = self.runner_cfg.get("timeout", 60)
51         self.interval = self.runner_cfg.get("interval", 1)
52         self.benchmark = None
53         self.method = None
54
55     def __call__(self, *args, **kwargs):
56         if self.method is None:
57             raise RuntimeError
58         return self.method(*args, **kwargs)
59
60     @contextmanager
61     def get_benchmark_instance(self):
62         self.benchmark = self.cls(self.scenario_cfg, self.context_cfg)
63
64         if 'setup' in self.run_step:
65             self.benchmark.setup()
66
67         self.method = getattr(self.benchmark, self.method_name)
68         LOG.info("worker START, timeout %d sec, class %s", self.timeout, self.cls)
69         try:
70             yield self
71         finally:
72             if 'teardown' in self.run_step:
73                 self.benchmark.teardown()
74
75     def is_not_done(self):
76         if 'run' not in self.run_step:
77             raise StopIteration
78
79         max_time = time.time() + self.timeout
80
81         abort_iter = iter(self.aborted.is_set, True)
82         time_iter = takewhile(lambda t_now: t_now <= max_time, iter(time.time, -1))
83
84         for seq, _ in enumerate(zip(abort_iter, time_iter), 1):
85             yield seq
86             time.sleep(self.interval)
87
88
89 class SearchRunner(base.Runner):
90     """Run a scenario for a certain amount of time
91
92 If the scenario ends before the time has elapsed, it will be started again.
93
94   Parameters
95     timeout - amount of time the scenario will be run for
96         type:    int
97         unit:    seconds
98         default: 1 sec
99     interval - time to wait between each scenario invocation
100         type:    int
101         unit:    seconds
102         default: 1 sec
103     """
104     __execution_type__ = 'Search'
105
106     def __init__(self, config):
107         super(SearchRunner, self).__init__(config)
108         self.runner_cfg = None
109         self.runner_id = None
110         self.sla_action = None
111         self.worker_helper = None
112
113     def _worker_run_once(self, sequence):
114         LOG.debug("runner=%s seq=%s START", self.runner_id, sequence)
115
116         data = {}
117         errors = ""
118
119         try:
120             self.worker_helper(data)
121         except AssertionError as assertion:
122             # SLA validation failed in scenario, determine what to do now
123             if self.sla_action == "assert":
124                 raise
125             elif self.sla_action == "monitor":
126                 LOG.warning("SLA validation failed: %s", assertion.args)
127                 errors = assertion.args
128         except Exception as e:
129             errors = traceback.format_exc()
130             LOG.exception(e)
131
132         record = {
133             'runner_id': self.runner_id,
134             'benchmark': {
135                 'timestamp': time.time(),
136                 'sequence': sequence,
137                 'data': data,
138                 'errors': errors,
139             },
140         }
141
142         self.result_queue.put(record)
143
144         LOG.debug("runner=%s seq=%s END", self.runner_id, sequence)
145
146         # Have to search through all the VNF KPIs
147         kpi_done = any(kpi.get('done') for kpi in data.values() if isinstance(kpi, Mapping))
148
149         return kpi_done or (errors and self.sla_action is None)
150
151     def _worker_run(self, cls, method_name, scenario_cfg, context_cfg):
152         self.runner_cfg = scenario_cfg['runner']
153         self.runner_id = self.runner_cfg['runner_id'] = os.getpid()
154
155         self.worker_helper = SearchRunnerHelper(cls, method_name, scenario_cfg,
156                                                 context_cfg, self.aborted)
157
158         try:
159             self.sla_action = scenario_cfg['sla'].get('action', 'assert')
160         except KeyError:
161             self.sla_action = None
162
163         self.result_queue.put({
164             'runner_id': self.runner_id,
165             'scenario_cfg': scenario_cfg,
166             'context_cfg': context_cfg
167         })
168
169         with self.worker_helper.get_benchmark_instance():
170             for sequence in self.worker_helper.is_not_done():
171                 if self._worker_run_once(sequence):
172                     LOG.info("worker END")
173                     break
174
175     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
176         self.process = multiprocessing.Process(
177             target=self._worker_run,
178             args=(cls, method, scenario_cfg, context_cfg))
179         self.process.start()