X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Frecovery_bench.py;fp=src%2Fceph%2Fqa%2Ftasks%2Frecovery_bench.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=5eb9fd21d46caf4dc9fdcae0281d8d3c3b12b84f;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/recovery_bench.py b/src/ceph/qa/tasks/recovery_bench.py deleted file mode 100644 index 5eb9fd2..0000000 --- a/src/ceph/qa/tasks/recovery_bench.py +++ /dev/null @@ -1,208 +0,0 @@ -""" -Recovery system benchmarking -""" -from cStringIO import StringIO - -import contextlib -import gevent -import json -import logging -import random -import time - -import ceph_manager -from teuthology import misc as teuthology - -log = logging.getLogger(__name__) - -@contextlib.contextmanager -def task(ctx, config): - """ - Benchmark the recovery system. - - Generates objects with smalliobench, runs it normally to get a - baseline performance measurement, then marks an OSD out and reruns - to measure performance during recovery. - - The config should be as follows: - - recovery_bench: - duration: - num_objects: - io_size: - - example: - - tasks: - - ceph: - - recovery_bench: - duration: 60 - num_objects: 500 - io_size: 4096 - """ - if config is None: - config = {} - assert isinstance(config, dict), \ - 'recovery_bench task only accepts a dict for configuration' - - log.info('Beginning recovery bench...') - - first_mon = teuthology.get_first_mon(ctx, config) - (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys() - - manager = ceph_manager.CephManager( - mon, - ctx=ctx, - logger=log.getChild('ceph_manager'), - ) - - num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd') - while len(manager.get_osd_status()['up']) < num_osds: - time.sleep(10) - - bench_proc = RecoveryBencher( - manager, - config, - ) - try: - yield - finally: - log.info('joining recovery bencher') - bench_proc.do_join() - -class RecoveryBencher: - """ - RecoveryBencher - """ - def __init__(self, manager, config): - self.ceph_manager = manager - self.ceph_manager.wait_for_clean() - - osd_status = self.ceph_manager.get_osd_status() - self.osds = osd_status['up'] - - self.config = config - if self.config is None: - self.config = dict() - - else: - def tmp(x): - """ - Local wrapper to print value. - """ - print x - self.log = tmp - - log.info("spawning thread") - - self.thread = gevent.spawn(self.do_bench) - - def do_join(self): - """ - Join the recovery bencher. This is called after the main - task exits. - """ - self.thread.get() - - def do_bench(self): - """ - Do the benchmarking. - """ - duration = self.config.get("duration", 60) - num_objects = self.config.get("num_objects", 500) - io_size = self.config.get("io_size", 4096) - - osd = str(random.choice(self.osds)) - (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys() - - testdir = teuthology.get_testdir(self.ceph_manager.ctx) - - # create the objects - osd_remote.run( - args=[ - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'smalliobench'.format(tdir=testdir), - '--use-prefix', 'recovery_bench', - '--init-only', '1', - '--num-objects', str(num_objects), - '--io-size', str(io_size), - ], - wait=True, - ) - - # baseline bench - log.info('non-recovery (baseline)') - p = osd_remote.run( - args=[ - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'smalliobench', - '--use-prefix', 'recovery_bench', - '--do-not-init', '1', - '--duration', str(duration), - '--io-size', str(io_size), - ], - stdout=StringIO(), - stderr=StringIO(), - wait=True, - ) - self.process_samples(p.stderr.getvalue()) - - self.ceph_manager.raw_cluster_cmd('osd', 'out', osd) - time.sleep(5) - - # recovery bench - log.info('recovery active') - p = osd_remote.run( - args=[ - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'smalliobench', - '--use-prefix', 'recovery_bench', - '--do-not-init', '1', - '--duration', str(duration), - '--io-size', str(io_size), - ], - stdout=StringIO(), - stderr=StringIO(), - wait=True, - ) - self.process_samples(p.stderr.getvalue()) - - self.ceph_manager.raw_cluster_cmd('osd', 'in', osd) - - def process_samples(self, input): - """ - Extract samples from the input and process the results - - :param input: input lines in JSON format - """ - lat = {} - for line in input.split('\n'): - try: - sample = json.loads(line) - samples = lat.setdefault(sample['type'], []) - samples.append(float(sample['latency'])) - except Exception: - pass - - for type in lat: - samples = lat[type] - samples.sort() - - num = len(samples) - - # median - if num & 1 == 1: # odd number of samples - median = samples[num / 2] - else: - median = (samples[num / 2] + samples[num / 2 - 1]) / 2 - - # 99% - ninety_nine = samples[int(num * 0.99)] - - log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))