Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / recovery_bench.py
1 """
2 Recovery system benchmarking
3 """
4 from cStringIO import StringIO
5
6 import contextlib
7 import gevent
8 import json
9 import logging
10 import random
11 import time
12
13 import ceph_manager
14 from teuthology import misc as teuthology
15
16 log = logging.getLogger(__name__)
17
18 @contextlib.contextmanager
19 def task(ctx, config):
20     """
21     Benchmark the recovery system.
22
23     Generates objects with smalliobench, runs it normally to get a
24     baseline performance measurement, then marks an OSD out and reruns
25     to measure performance during recovery.
26
27     The config should be as follows:
28
29     recovery_bench:
30         duration: <seconds for each measurement run>
31         num_objects: <number of objects>
32         io_size: <io size in bytes>
33
34     example:
35
36     tasks:
37     - ceph:
38     - recovery_bench:
39         duration: 60
40         num_objects: 500
41         io_size: 4096
42     """
43     if config is None:
44         config = {}
45     assert isinstance(config, dict), \
46         'recovery_bench task only accepts a dict for configuration'
47
48     log.info('Beginning recovery bench...')
49
50     first_mon = teuthology.get_first_mon(ctx, config)
51     (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
52
53     manager = ceph_manager.CephManager(
54         mon,
55         ctx=ctx,
56         logger=log.getChild('ceph_manager'),
57         )
58
59     num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
60     while len(manager.get_osd_status()['up']) < num_osds:
61         time.sleep(10)
62
63     bench_proc = RecoveryBencher(
64         manager,
65         config,
66         )
67     try:
68         yield
69     finally:
70         log.info('joining recovery bencher')
71         bench_proc.do_join()
72
73 class RecoveryBencher:
74     """
75     RecoveryBencher
76     """
77     def __init__(self, manager, config):
78         self.ceph_manager = manager
79         self.ceph_manager.wait_for_clean()
80
81         osd_status = self.ceph_manager.get_osd_status()
82         self.osds = osd_status['up']
83
84         self.config = config
85         if self.config is None:
86             self.config = dict()
87
88         else:
89             def tmp(x):
90                 """
91                 Local wrapper to print value.
92                 """
93                 print x
94             self.log = tmp
95
96         log.info("spawning thread")
97
98         self.thread = gevent.spawn(self.do_bench)
99
100     def do_join(self):
101         """
102         Join the recovery bencher.  This is called after the main
103         task exits.
104         """
105         self.thread.get()
106
107     def do_bench(self):
108         """
109         Do the benchmarking.
110         """
111         duration = self.config.get("duration", 60)
112         num_objects = self.config.get("num_objects", 500)
113         io_size = self.config.get("io_size", 4096)
114
115         osd = str(random.choice(self.osds))
116         (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys()
117
118         testdir = teuthology.get_testdir(self.ceph_manager.ctx)
119
120         # create the objects
121         osd_remote.run(
122             args=[
123                 'adjust-ulimits',
124                 'ceph-coverage',
125                 '{tdir}/archive/coverage'.format(tdir=testdir),
126                 'smalliobench'.format(tdir=testdir),
127                 '--use-prefix', 'recovery_bench',
128                 '--init-only', '1',
129                 '--num-objects', str(num_objects),
130                 '--io-size', str(io_size),
131                 ],
132             wait=True,
133         )
134
135         # baseline bench
136         log.info('non-recovery (baseline)')
137         p = osd_remote.run(
138             args=[
139                 'adjust-ulimits',
140                 'ceph-coverage',
141                 '{tdir}/archive/coverage'.format(tdir=testdir),
142                 'smalliobench',
143                 '--use-prefix', 'recovery_bench',
144                 '--do-not-init', '1',
145                 '--duration', str(duration),
146                 '--io-size', str(io_size),
147                 ],
148             stdout=StringIO(),
149             stderr=StringIO(),
150             wait=True,
151         )
152         self.process_samples(p.stderr.getvalue())
153
154         self.ceph_manager.raw_cluster_cmd('osd', 'out', osd)
155         time.sleep(5)
156
157         # recovery bench
158         log.info('recovery active')
159         p = osd_remote.run(
160             args=[
161                 'adjust-ulimits',
162                 'ceph-coverage',
163                 '{tdir}/archive/coverage'.format(tdir=testdir),
164                 'smalliobench',
165                 '--use-prefix', 'recovery_bench',
166                 '--do-not-init', '1',
167                 '--duration', str(duration),
168                 '--io-size', str(io_size),
169                 ],
170             stdout=StringIO(),
171             stderr=StringIO(),
172             wait=True,
173         )
174         self.process_samples(p.stderr.getvalue())
175
176         self.ceph_manager.raw_cluster_cmd('osd', 'in', osd)
177
178     def process_samples(self, input):
179         """
180         Extract samples from the input and process the results
181
182         :param input: input lines in JSON format
183         """
184         lat = {}
185         for line in input.split('\n'):
186             try:
187                 sample = json.loads(line)
188                 samples = lat.setdefault(sample['type'], [])
189                 samples.append(float(sample['latency']))
190             except Exception:
191                 pass
192
193         for type in lat:
194             samples = lat[type]
195             samples.sort()
196
197             num = len(samples)
198
199             # median
200             if num & 1 == 1: # odd number of samples
201                 median = samples[num / 2]
202             else:
203                 median = (samples[num / 2] + samples[num / 2 - 1]) / 2
204
205             # 99%
206             ninety_nine = samples[int(num * 0.99)]
207
208             log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))