Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs_test_runner.py
1 import contextlib
2 import logging
3 import os
4 import unittest
5 from unittest import suite, loader, case
6 from teuthology.task import interactive
7 from teuthology import misc
8 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
9 from tasks.mgr.mgr_test_case import MgrCluster
10
11 log = logging.getLogger(__name__)
12
13
14 class DecoratingLoader(loader.TestLoader):
15     """
16     A specialization of TestLoader that tags some extra attributes
17     onto test classes as they are loaded.
18     """
19     def __init__(self, params):
20         self._params = params
21         super(DecoratingLoader, self).__init__()
22
23     def _apply_params(self, obj):
24         for k, v in self._params.items():
25             setattr(obj, k, v)
26
27     def loadTestsFromTestCase(self, testCaseClass):
28         self._apply_params(testCaseClass)
29         return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
30
31     def loadTestsFromName(self, name, module=None):
32         result = super(DecoratingLoader, self).loadTestsFromName(name, module)
33
34         # Special case for when we were called with the name of a method, we get
35         # a suite with one TestCase
36         tests_in_result = list(result)
37         if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
38             self._apply_params(tests_in_result[0])
39
40         return result
41
42
43 class LogStream(object):
44     def __init__(self):
45         self.buffer = ""
46
47     def write(self, data):
48         self.buffer += data
49         if "\n" in self.buffer:
50             lines = self.buffer.split("\n")
51             for line in lines[:-1]:
52                 log.info(line)
53             self.buffer = lines[-1]
54
55     def flush(self):
56         pass
57
58
59 class InteractiveFailureResult(unittest.TextTestResult):
60     """
61     Specialization that implements interactive-on-error style
62     behavior.
63     """
64     ctx = None
65
66     def addFailure(self, test, err):
67         log.error(self._exc_info_to_string(err, test))
68         log.error("Failure in test '{0}', going interactive".format(
69             self.getDescription(test)
70         ))
71         interactive.task(ctx=self.ctx, config=None)
72
73     def addError(self, test, err):
74         log.error(self._exc_info_to_string(err, test))
75         log.error("Error in test '{0}', going interactive".format(
76             self.getDescription(test)
77         ))
78         interactive.task(ctx=self.ctx, config=None)
79
80
81 @contextlib.contextmanager
82 def task(ctx, config):
83     """
84     Run the CephFS test cases.
85
86     Run everything in tasks/cephfs/test_*.py:
87
88     ::
89
90         tasks:
91           - install:
92           - ceph:
93           - ceph-fuse:
94           - cephfs_test_runner:
95
96     `modules` argument allows running only some specific modules:
97
98     ::
99
100         tasks:
101             ...
102           - cephfs_test_runner:
103               modules:
104                 - tasks.cephfs.test_sessionmap
105                 - tasks.cephfs.test_auto_repair
106
107     By default, any cases that can't be run on the current cluster configuration
108     will generate a failure.  When the optional `fail_on_skip` argument is set
109     to false, any tests that can't be run on the current configuration will
110     simply be skipped:
111
112     ::
113         tasks:
114             ...
115          - cephfs_test_runner:
116            fail_on_skip: false
117
118     """
119
120     ceph_cluster = CephCluster(ctx)
121
122     if len(list(misc.all_roles_of_type(ctx.cluster, 'mds'))):
123         mds_cluster = MDSCluster(ctx)
124         fs = Filesystem(ctx)
125     else:
126         mds_cluster = None
127         fs = None
128
129     if len(list(misc.all_roles_of_type(ctx.cluster, 'mgr'))):
130         mgr_cluster = MgrCluster(ctx)
131     else:
132         mgr_cluster = None
133
134     # Mount objects, sorted by ID
135     if hasattr(ctx, 'mounts'):
136         mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]
137     else:
138         # The test configuration has a filesystem but no fuse/kclient mounts
139         mounts = []
140
141     decorating_loader = DecoratingLoader({
142         "ctx": ctx,
143         "mounts": mounts,
144         "fs": fs,
145         "ceph_cluster": ceph_cluster,
146         "mds_cluster": mds_cluster,
147         "mgr_cluster": mgr_cluster,
148     })
149
150     fail_on_skip = config.get('fail_on_skip', True)
151
152     # Put useful things onto ctx for interactive debugging
153     ctx.fs = fs
154     ctx.mds_cluster = mds_cluster
155     ctx.mgr_cluster = mgr_cluster
156
157     # Depending on config, either load specific modules, or scan for moduless
158     if config and 'modules' in config and config['modules']:
159         module_suites = []
160         for mod_name in config['modules']:
161             # Test names like cephfs.test_auto_repair
162             module_suites.append(decorating_loader.loadTestsFromName(mod_name))
163         overall_suite = suite.TestSuite(module_suites)
164     else:
165         # Default, run all tests
166         overall_suite = decorating_loader.discover(
167             os.path.join(
168                 os.path.dirname(os.path.abspath(__file__)),
169                 "cephfs/"
170             )
171         )
172
173     if ctx.config.get("interactive-on-error", False):
174         InteractiveFailureResult.ctx = ctx
175         result_class = InteractiveFailureResult
176     else:
177         result_class = unittest.TextTestResult
178
179     class LoggingResult(result_class):
180         def startTest(self, test):
181             log.info("Starting test: {0}".format(self.getDescription(test)))
182             return super(LoggingResult, self).startTest(test)
183
184         def addSkip(self, test, reason):
185             if fail_on_skip:
186                 # Don't just call addFailure because that requires a traceback
187                 self.failures.append((test, reason))
188             else:
189                 super(LoggingResult, self).addSkip(test, reason)
190
191     # Execute!
192     result = unittest.TextTestRunner(
193         stream=LogStream(),
194         resultclass=LoggingResult,
195         verbosity=2,
196         failfast=True).run(overall_suite)
197
198     if not result.wasSuccessful():
199         result.printErrors()  # duplicate output at end for convenience
200
201         bad_tests = []
202         for test, error in result.errors:
203             bad_tests.append(str(test))
204         for test, failure in result.failures:
205             bad_tests.append(str(test))
206
207         raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
208
209     yield