Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / radosbenchsweep.py
1 """
2 Rados benchmarking sweep
3 """
4 import contextlib
5 import logging
6 import re
7
8 from cStringIO import StringIO
9 from itertools import product
10
11 from teuthology.orchestra import run
12 from teuthology import misc as teuthology
13
14 log = logging.getLogger(__name__)
15
16
17 @contextlib.contextmanager
18 def task(ctx, config):
19     """
20     Execute a radosbench parameter sweep
21
22     Puts radosbench in a loop, taking values from the given config at each
23     iteration. If given, the min and max values below create a range, e.g.
24     min_replicas=1 and max_replicas=3 implies executing with 1-3 replicas.
25
26     Parameters:
27
28         clients: [client list]
29         time: seconds to run (default=120)
30         sizes: [list of object sizes] (default=[4M])
31         mode: <write|read|seq> (default=write)
32         repetitions: execute the same configuration multiple times (default=1)
33         min_num_replicas: minimum number of replicas to use (default = 3)
34         max_num_replicas: maximum number of replicas to use (default = 3)
35         min_num_osds: the minimum number of OSDs in a pool (default=all)
36         max_num_osds: the maximum number of OSDs in a pool (default=all)
37         file: name of CSV-formatted output file (default='radosbench.csv')
38         columns: columns to include (default=all)
39           - rep: execution number (takes values from 'repetitions')
40           - num_osd: number of osds for pool
41           - num_replica: number of replicas
42           - avg_throughput: throughput
43           - avg_latency: latency
44           - stdev_throughput:
45           - stdev_latency:
46
47     Example:
48     - radsobenchsweep:
49         columns: [rep, num_osd, num_replica, avg_throughput, stdev_throughput]
50     """
51     log.info('Beginning radosbenchsweep...')
52     assert isinstance(config, dict), 'expecting dictionary for configuration'
53
54     # get and validate config values
55     # {
56
57     # only one client supported for now
58     if len(config.get('clients', [])) != 1:
59         raise Exception("Only one client can be specified")
60
61     # only write mode
62     if config.get('mode', 'write') != 'write':
63         raise Exception("Only 'write' mode supported for now.")
64
65     # OSDs
66     total_osds_in_cluster = teuthology.num_instances_of_type(ctx.cluster, 'osd')
67     min_num_osds = config.get('min_num_osds', total_osds_in_cluster)
68     max_num_osds = config.get('max_num_osds', total_osds_in_cluster)
69
70     if max_num_osds > total_osds_in_cluster:
71         raise Exception('max_num_osds cannot be greater than total in cluster')
72     if min_num_osds < 1:
73         raise Exception('min_num_osds cannot be less than 1')
74     if min_num_osds > max_num_osds:
75         raise Exception('min_num_osds cannot be greater than max_num_osd')
76     osds = range(0, (total_osds_in_cluster + 1))
77
78     # replicas
79     min_num_replicas = config.get('min_num_replicas', 3)
80     max_num_replicas = config.get('max_num_replicas', 3)
81
82     if min_num_replicas < 1:
83         raise Exception('min_num_replicas cannot be less than 1')
84     if min_num_replicas > max_num_replicas:
85         raise Exception('min_num_replicas cannot be greater than max_replicas')
86     if max_num_replicas > max_num_osds:
87         raise Exception('max_num_replicas cannot be greater than max_num_osds')
88     replicas = range(min_num_replicas, (max_num_replicas + 1))
89
90     # object size
91     sizes = config.get('size', [4 << 20])
92
93     # repetitions
94     reps = range(config.get('repetitions', 1))
95
96     # file
97     fname = config.get('file', 'radosbench.csv')
98     f = open('{}/{}'.format(ctx.archive, fname), 'w')
99     f.write(get_csv_header(config) + '\n')
100     # }
101
102     # set default pools size=1 to avoid 'unhealthy' issues
103     ctx.manager.set_pool_property('data', 'size', 1)
104     ctx.manager.set_pool_property('metadata', 'size', 1)
105     ctx.manager.set_pool_property('rbd', 'size', 1)
106
107     current_osds_out = 0
108
109     # sweep through all parameters
110     for osds_out, size, replica, rep in product(osds, sizes, replicas, reps):
111
112         osds_in = total_osds_in_cluster - osds_out
113
114         if osds_in == 0:
115             # we're done
116             break
117
118         if current_osds_out != osds_out:
119             # take an osd out
120             ctx.manager.raw_cluster_cmd(
121                 'osd', 'reweight', str(osds_out-1), '0.0')
122             wait_until_healthy(ctx, config)
123             current_osds_out = osds_out
124
125         if osds_in not in range(min_num_osds, (max_num_osds + 1)):
126             # no need to execute with a number of osds that wasn't requested
127             continue
128
129         if osds_in < replica:
130             # cannot execute with more replicas than available osds
131             continue
132
133         run_radosbench(ctx, config, f, osds_in, size, replica, rep)
134
135     f.close()
136
137     yield
138
139
140 def get_csv_header(conf):
141     all_columns = [
142         'rep', 'num_osd', 'num_replica', 'avg_throughput',
143         'avg_latency', 'stdev_throughput', 'stdev_latency'
144     ]
145     given_columns = conf.get('columns', None)
146     if given_columns and len(given_columns) != 0:
147         for column in given_columns:
148             if column not in all_columns:
149                 raise Exception('Unknown column ' + column)
150         return ','.join(conf['columns'])
151     else:
152         conf['columns'] = all_columns
153         return ','.join(all_columns)
154
155
156 def run_radosbench(ctx, config, f, num_osds, size, replica, rep):
157     pool = ctx.manager.create_pool_with_unique_name()
158
159     ctx.manager.set_pool_property(pool, 'size', replica)
160
161     wait_until_healthy(ctx, config)
162
163     log.info('Executing with parameters: ')
164     log.info('  num_osd =' + str(num_osds))
165     log.info('  size =' + str(size))
166     log.info('  num_replicas =' + str(replica))
167     log.info('  repetition =' + str(rep))
168
169     for role in config.get('clients', ['client.0']):
170         assert isinstance(role, basestring)
171         PREFIX = 'client.'
172         assert role.startswith(PREFIX)
173         id_ = role[len(PREFIX):]
174         (remote,) = ctx.cluster.only(role).remotes.iterkeys()
175
176         proc = remote.run(
177             args=[
178                 'adjust-ulimits',
179                 'ceph-coverage',
180                 '{}/archive/coverage'.format(teuthology.get_testdir(ctx)),
181                 'rados',
182                 '--no-log-to-stderr',
183                 '--name', role,
184                 '-b', str(size),
185                 '-p', pool,
186                 'bench', str(config.get('time', 120)), 'write',
187             ],
188             logger=log.getChild('radosbench.{id}'.format(id=id_)),
189             stdin=run.PIPE,
190             stdout=StringIO(),
191             wait=False
192         )
193
194         # parse output to get summary and format it as CSV
195         proc.wait()
196         out = proc.stdout.getvalue()
197         all_values = {
198             'stdev_throughput': re.sub(r'Stddev Bandwidth: ', '', re.search(
199                 r'Stddev Bandwidth:.*', out).group(0)),
200             'stdev_latency': re.sub(r'Stddev Latency: ', '', re.search(
201                 r'Stddev Latency:.*', out).group(0)),
202             'avg_throughput': re.sub(r'Bandwidth \(MB/sec\): ', '', re.search(
203                 r'Bandwidth \(MB/sec\):.*', out).group(0)),
204             'avg_latency': re.sub(r'Average Latency: ', '', re.search(
205                 r'Average Latency:.*', out).group(0)),
206             'rep': str(rep),
207             'num_osd': str(num_osds),
208             'num_replica': str(replica)
209         }
210         values_to_write = []
211         for column in config['columns']:
212             values_to_write.extend([all_values[column]])
213         f.write(','.join(values_to_write) + '\n')
214
215     ctx.manager.remove_pool(pool)
216
217
218 def wait_until_healthy(ctx, config):
219     first_mon = teuthology.get_first_mon(ctx, config)
220     (mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
221     teuthology.wait_until_healthy(ctx, mon_remote)