# License for the specific language governing permissions and limitations
# under the License.
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/scenarios/base.py
+import abc
+import time
-""" Scenario base class
-"""
+import six
+from stevedore import extension
-from __future__ import absolute_import
import yardstick.common.utils as utils
+from yardstick.common import exceptions as y_exc
+def _iter_scenario_classes(scenario_type=None):
+ """Generator over all 'Scenario' subclasses
+
+ This function will iterate over all 'Scenario' subclasses defined in this
+ project and will load any class introduced by any installed plugin project,
+ defined in 'entry_points' section, under 'yardstick.scenarios' subsection.
+ """
+ extension.ExtensionManager(namespace='yardstick.scenarios',
+ invoke_on_load=False)
+ for scenario in utils.itersubclasses(Scenario):
+ if not scenario_type:
+ yield scenario
+ elif getattr(scenario, '__scenario_type__', None) == scenario_type:
+ yield scenario
+
+
+@six.add_metaclass(abc.ABCMeta)
class Scenario(object):
def setup(self):
- """ default impl for scenario setup """
+ """Default setup implementation for Scenario classes"""
pass
- def run(self, args):
- """ catcher for not implemented run methods in subclasses """
- raise RuntimeError("run method not implemented")
+ @abc.abstractmethod
+ def run(self, *args):
+ """Entry point for scenario classes, called from runner worker"""
def teardown(self):
- """ default impl for scenario teardown """
+ """Default teardown implementation for Scenario classes"""
+ pass
+
+ def pre_run_wait_time(self, time_seconds):
+ """Time waited before executing the run method"""
pass
+ def post_run_wait_time(self, time_seconds):
+ """Time waited after executing the run method"""
+ time.sleep(time_seconds)
+
+ def verify_SLA(self, condition, error_msg):
+ if not condition:
+ raise y_exc.SLAValidationError(
+ case_name=self.__scenario_type__, error_msg=error_msg)
+
@staticmethod
def get_types():
"""return a list of known runner type (class) names"""
scenarios = []
- for scenario in utils.itersubclasses(Scenario):
+ for scenario in _iter_scenario_classes():
scenarios.append(scenario)
return scenarios
@staticmethod
def get_cls(scenario_type):
"""return class of specified type"""
- for scenario in utils.itersubclasses(Scenario):
- if scenario_type == scenario.__scenario_type__:
- return scenario
+ for scenario in _iter_scenario_classes(scenario_type):
+ return scenario
raise RuntimeError("No such scenario type %s" % scenario_type)
def get(scenario_type):
"""Returns instance of a scenario runner for execution type.
"""
- for scenario in utils.itersubclasses(Scenario):
- if scenario_type == scenario.__scenario_type__:
- return scenario.__module__ + "." + scenario.__name__
+ scenario = Scenario.get_cls(scenario_type)
+ return scenario.__module__ + "." + scenario.__name__
- raise RuntimeError("No such scenario type %s" % scenario_type)
+ @classmethod
+ def get_scenario_type(cls):
+ """Return a string with the scenario type, if defined"""
+ return str(getattr(cls, '__scenario_type__', None))
+
+ @classmethod
+ def get_description(cls):
+ """Return a single line string with the class description
+
+ This function will retrieve the class docstring and return the first
+ line, or 'None' if it's empty.
+ """
+ return cls.__doc__.splitlines()[0] if cls.__doc__ else str(None)
+
+ @staticmethod
+ def _push_to_outputs(keys, values):
+ """Return a dictionary given the keys and the values"""
+ return dict(zip(keys, values))
+
+ @staticmethod
+ def _change_obj_to_dict(obj):
+ """Return a dictionary from the __dict__ attribute of an object"""
+ dic = {}
+ for k, v in vars(obj).items():
+ try:
+ vars(v)
+ except TypeError:
+ dic[k] = v
+ return dic