X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Frecovery_bench.py;fp=src%2Fceph%2Fqa%2Ftasks%2Frecovery_bench.py;h=5eb9fd21d46caf4dc9fdcae0281d8d3c3b12b84f;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/recovery_bench.py b/src/ceph/qa/tasks/recovery_bench.py new file mode 100644 index 0000000..5eb9fd2 --- /dev/null +++ b/src/ceph/qa/tasks/recovery_bench.py @@ -0,0 +1,208 @@ +""" +Recovery system benchmarking +""" +from cStringIO import StringIO + +import contextlib +import gevent +import json +import logging +import random +import time + +import ceph_manager +from teuthology import misc as teuthology + +log = logging.getLogger(__name__) + +@contextlib.contextmanager +def task(ctx, config): + """ + Benchmark the recovery system. + + Generates objects with smalliobench, runs it normally to get a + baseline performance measurement, then marks an OSD out and reruns + to measure performance during recovery. + + The config should be as follows: + + recovery_bench: + duration: + num_objects: + io_size: + + example: + + tasks: + - ceph: + - recovery_bench: + duration: 60 + num_objects: 500 + io_size: 4096 + """ + if config is None: + config = {} + assert isinstance(config, dict), \ + 'recovery_bench task only accepts a dict for configuration' + + log.info('Beginning recovery bench...') + + first_mon = teuthology.get_first_mon(ctx, config) + (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys() + + manager = ceph_manager.CephManager( + mon, + ctx=ctx, + logger=log.getChild('ceph_manager'), + ) + + num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd') + while len(manager.get_osd_status()['up']) < num_osds: + time.sleep(10) + + bench_proc = RecoveryBencher( + manager, + config, + ) + try: + yield + finally: + log.info('joining recovery bencher') + bench_proc.do_join() + +class RecoveryBencher: + """ + RecoveryBencher + """ + def __init__(self, manager, config): + self.ceph_manager = manager + self.ceph_manager.wait_for_clean() + + osd_status = self.ceph_manager.get_osd_status() + self.osds = osd_status['up'] + + self.config = config + if self.config is None: + self.config = dict() + + else: + def tmp(x): + """ + Local wrapper to print value. + """ + print x + self.log = tmp + + log.info("spawning thread") + + self.thread = gevent.spawn(self.do_bench) + + def do_join(self): + """ + Join the recovery bencher. This is called after the main + task exits. + """ + self.thread.get() + + def do_bench(self): + """ + Do the benchmarking. + """ + duration = self.config.get("duration", 60) + num_objects = self.config.get("num_objects", 500) + io_size = self.config.get("io_size", 4096) + + osd = str(random.choice(self.osds)) + (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys() + + testdir = teuthology.get_testdir(self.ceph_manager.ctx) + + # create the objects + osd_remote.run( + args=[ + 'adjust-ulimits', + 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=testdir), + 'smalliobench'.format(tdir=testdir), + '--use-prefix', 'recovery_bench', + '--init-only', '1', + '--num-objects', str(num_objects), + '--io-size', str(io_size), + ], + wait=True, + ) + + # baseline bench + log.info('non-recovery (baseline)') + p = osd_remote.run( + args=[ + 'adjust-ulimits', + 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=testdir), + 'smalliobench', + '--use-prefix', 'recovery_bench', + '--do-not-init', '1', + '--duration', str(duration), + '--io-size', str(io_size), + ], + stdout=StringIO(), + stderr=StringIO(), + wait=True, + ) + self.process_samples(p.stderr.getvalue()) + + self.ceph_manager.raw_cluster_cmd('osd', 'out', osd) + time.sleep(5) + + # recovery bench + log.info('recovery active') + p = osd_remote.run( + args=[ + 'adjust-ulimits', + 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=testdir), + 'smalliobench', + '--use-prefix', 'recovery_bench', + '--do-not-init', '1', + '--duration', str(duration), + '--io-size', str(io_size), + ], + stdout=StringIO(), + stderr=StringIO(), + wait=True, + ) + self.process_samples(p.stderr.getvalue()) + + self.ceph_manager.raw_cluster_cmd('osd', 'in', osd) + + def process_samples(self, input): + """ + Extract samples from the input and process the results + + :param input: input lines in JSON format + """ + lat = {} + for line in input.split('\n'): + try: + sample = json.loads(line) + samples = lat.setdefault(sample['type'], []) + samples.append(float(sample['latency'])) + except Exception: + pass + + for type in lat: + samples = lat[type] + samples.sort() + + num = len(samples) + + # median + if num & 1 == 1: # odd number of samples + median = samples[num / 2] + else: + median = (samples[num / 2] + samples[num / 2 - 1]) / 2 + + # 99% + ninety_nine = samples[int(num * 0.99)] + + log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))