X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Fmds_thrash.py;fp=src%2Fceph%2Fqa%2Ftasks%2Fmds_thrash.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=75d236d105e2cd4fae143bfaa986a851809aeaa2;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/mds_thrash.py b/src/ceph/qa/tasks/mds_thrash.py deleted file mode 100644 index 75d236d..0000000 --- a/src/ceph/qa/tasks/mds_thrash.py +++ /dev/null @@ -1,555 +0,0 @@ -""" -Thrash mds by simulating failures -""" -import logging -import contextlib -import ceph_manager -import itertools -import random -import signal -import time - -from gevent import sleep -from gevent.greenlet import Greenlet -from gevent.event import Event -from teuthology import misc as teuthology - -from tasks.cephfs.filesystem import MDSCluster, Filesystem - -log = logging.getLogger(__name__) - -class DaemonWatchdog(Greenlet): - """ - DaemonWatchdog:: - - Watch Ceph daemons for failures. If an extended failure is detected (i.e. - not intentional), then the watchdog will unmount file systems and send - SIGTERM to all daemons. The duration of an extended failure is configurable - with watchdog_daemon_timeout. - - watchdog_daemon_timeout [default: 300]: number of seconds a daemon - is allowed to be failed before the watchdog will bark. - """ - - def __init__(self, ctx, manager, config, thrashers): - Greenlet.__init__(self) - self.ctx = ctx - self.config = config - self.e = None - self.logger = log.getChild('daemon_watchdog') - self.manager = manager - self.name = 'watchdog' - self.stopping = Event() - self.thrashers = thrashers - - def _run(self): - try: - self.watch() - except Exception as e: - # See _run exception comment for MDSThrasher - self.e = e - self.logger.exception("exception:") - # allow successful completion so gevent doesn't see an exception... - - def log(self, x): - """Write data to logger""" - self.logger.info(x) - - def stop(self): - self.stopping.set() - - def bark(self): - self.log("BARK! unmounting mounts and killing all daemons") - for mount in self.ctx.mounts.values(): - try: - mount.umount_wait(force=True) - except: - self.logger.exception("ignoring exception:") - daemons = [] - daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster))) - daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster))) - for daemon in daemons: - try: - daemon.signal(signal.SIGTERM) - except: - self.logger.exception("ignoring exception:") - - def watch(self): - self.log("watchdog starting") - daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300)) - daemon_failure_time = {} - while not self.stopping.is_set(): - bark = False - now = time.time() - - mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster) - mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster) - clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster) - - #for daemon in mons: - # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished)) - #for daemon in mdss: - # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished)) - - daemon_failures = [] - daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons)) - daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss)) - for daemon in daemon_failures: - name = daemon.role + '.' + daemon.id_ - dt = daemon_failure_time.setdefault(name, (daemon, now)) - assert dt[0] is daemon - delta = now-dt[1] - self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta)) - if delta > daemon_timeout: - bark = True - - # If a daemon is no longer failed, remove it from tracking: - for name in daemon_failure_time.keys(): - if name not in [d.role + '.' + d.id_ for d in daemon_failures]: - self.log("daemon {name} has been restored".format(name=name)) - del daemon_failure_time[name] - - for thrasher in self.thrashers: - if thrasher.e is not None: - self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name)) - bark = True - - if bark: - self.bark() - return - - sleep(5) - - self.log("watchdog finished") - -class MDSThrasher(Greenlet): - """ - MDSThrasher:: - - The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc). - - The config is optional. Many of the config parameters are a a maximum value - to use when selecting a random value from a range. To always use the maximum - value, set no_random to true. The config is a dict containing some or all of: - - max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at - any given time. - - max_thrash_delay: [default: 30] maximum number of seconds to delay before - thrashing again. - - max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in - the replay state before thrashing. - - max_revive_delay: [default: 10] maximum number of seconds to delay before - bringing back a thrashed MDS. - - randomize: [default: true] enables randomization and use the max/min values - - seed: [no default] seed the random number generator - - thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed - during replay. Value should be between 0.0 and 1.0. - - thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds - cluster will be modified to a value [1, current) or (current, starting - max_mds]. When reduced, randomly selected MDSs other than rank 0 will be - deactivated to reach the new max_mds. Value should be between 0.0 and 1.0. - - thrash_while_stopping: [default: false] thrash an MDS while there - are MDS in up:stopping (because max_mds was changed and some - MDS were deactivated). - - thrash_weights: allows specific MDSs to be thrashed more/less frequently. - This option overrides anything specified by max_thrash. This option is a - dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b: - 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not - specified will be automatically given a weight of 0.0 (not thrashed). - For a given MDS, by default the trasher delays for up to - max_thrash_delay, trashes, waits for the MDS to recover, and iterates. - If a non-zero weight is specified for an MDS, for each iteration the - thrasher chooses whether to thrash during that iteration based on a - random value [0-1] not exceeding the weight of that MDS. - - Examples:: - - - The following example sets the likelihood that mds.a will be thrashed - to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the - likelihood that an MDS will be thrashed in replay to 40%. - Thrash weights do not have to sum to 1. - - tasks: - - ceph: - - mds_thrash: - thrash_weights: - - mds.a: 0.8 - - mds.b: 0.2 - thrash_in_replay: 0.4 - - ceph-fuse: - - workunit: - clients: - all: [suites/fsx.sh] - - The following example disables randomization, and uses the max delay values: - - tasks: - - ceph: - - mds_thrash: - max_thrash_delay: 10 - max_revive_delay: 1 - max_replay_thrash_delay: 4 - - """ - - def __init__(self, ctx, manager, config, fs, max_mds): - Greenlet.__init__(self) - - self.config = config - self.ctx = ctx - self.e = None - self.logger = log.getChild('fs.[{f}]'.format(f = fs.name)) - self.fs = fs - self.manager = manager - self.max_mds = max_mds - self.name = 'thrasher.fs.[{f}]'.format(f = fs.name) - self.stopping = Event() - - self.randomize = bool(self.config.get('randomize', True)) - self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05)) - self.max_thrash = int(self.config.get('max_thrash', 1)) - self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0)) - self.thrash_in_replay = float(self.config.get('thrash_in_replay', False)) - assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format( - v=self.thrash_in_replay) - self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0)) - self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0)) - - def _run(self): - try: - self.do_thrash() - except Exception as e: - # Log exceptions here so we get the full backtrace (gevent loses them). - # Also allow succesful completion as gevent exception handling is a broken mess: - # - # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051) - # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error - # self.print_exception(context, type, value, tb) - # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception - # traceback.print_exception(type, value, tb, file=errstream) - # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception - # _print(file, 'Traceback (most recent call last):') - # File "/usr/lib/python2.7/traceback.py", line 13, in _print - # file.write(str+terminator) - # 2017-02-03T14:34:01.261 CRITICAL:root:IOError - self.e = e - self.logger.exception("exception:") - # allow successful completion so gevent doesn't see an exception... - - def log(self, x): - """Write data to logger assigned to this MDThrasher""" - self.logger.info(x) - - def stop(self): - self.stopping.set() - - def kill_mds(self, mds): - if self.config.get('powercycle'): - (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). - remotes.iterkeys()) - self.log('kill_mds on mds.{m} doing powercycle of {s}'. - format(m=mds, s=remote.name)) - self._assert_ipmi(remote) - remote.console.power_off() - else: - self.ctx.daemons.get_daemon('mds', mds).stop() - - @staticmethod - def _assert_ipmi(remote): - assert remote.console.has_ipmi_credentials, ( - "powercycling requested but RemoteConsole is not " - "initialized. Check ipmi config.") - - def revive_mds(self, mds, standby_for_rank=None): - """ - Revive mds -- do an ipmpi powercycle (if indicated by the config) - and then restart (using --hot-standby if specified. - """ - if self.config.get('powercycle'): - (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). - remotes.iterkeys()) - self.log('revive_mds on mds.{m} doing powercycle of {s}'. - format(m=mds, s=remote.name)) - self._assert_ipmi(remote) - remote.console.power_on() - self.manager.make_admin_daemon_dir(self.ctx, remote) - args = [] - if standby_for_rank: - args.extend(['--hot-standby', standby_for_rank]) - self.ctx.daemons.get_daemon('mds', mds).restart(*args) - - def wait_for_stable(self, rank = None, gid = None): - self.log('waiting for mds cluster to stabilize...') - for itercount in itertools.count(): - status = self.fs.status() - max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] - ranks = list(status.get_ranks(self.fs.id)) - stopping = filter(lambda info: "up:stopping" == info['state'], ranks) - actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks) - - if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0: - if itercount % 5 == 0: - self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)') - else: - if rank is not None: - try: - info = status.get_rank(self.fs.id, rank) - if info['gid'] != gid and "up:active" == info['state']: - self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid)) - return status - except: - pass # no rank present - if len(actives) >= max_mds: - # no replacement can occur! - self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank)) - return status - else: - if len(actives) >= max_mds: - self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives))) - return status, None - if itercount > 300/2: # 5 minutes - raise RuntimeError('timeout waiting for cluster to stabilize') - elif itercount % 5 == 0: - self.log('mds map: {status}'.format(status=status)) - else: - self.log('no change') - sleep(2) - - def do_thrash(self): - """ - Perform the random thrashing action - """ - - self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name)) - stats = { - "max_mds": 0, - "deactivate": 0, - "kill": 0, - } - - while not self.stopping.is_set(): - delay = self.max_thrash_delay - if self.randomize: - delay = random.randrange(0.0, self.max_thrash_delay) - - if delay > 0.0: - self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) - self.stopping.wait(delay) - if self.stopping.is_set(): - continue - - status = self.fs.status() - - if random.random() <= self.thrash_max_mds: - max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] - options = range(1, max_mds)+range(max_mds+1, self.max_mds+1) - if len(options) > 0: - sample = random.sample(options, 1) - new_max_mds = sample[0] - self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds)) - self.fs.set_max_mds(new_max_mds) - stats['max_mds'] += 1 - - targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id)) - if len(targets) > 0: - # deactivate mds in decending order - targets = sorted(targets, key=lambda r: r['rank'], reverse=True) - for target in targets: - self.log("deactivating rank %d" % target['rank']) - self.fs.deactivate(target['rank']) - stats['deactivate'] += 1 - status = self.wait_for_stable()[0] - else: - status = self.wait_for_stable()[0] - - count = 0 - for info in status.get_ranks(self.fs.id): - name = info['name'] - label = 'mds.' + name - rank = info['rank'] - gid = info['gid'] - - # if thrash_weights isn't specified and we've reached max_thrash, - # we're done - count = count + 1 - if 'thrash_weights' not in self.config and count > self.max_thrash: - break - - weight = 1.0 - if 'thrash_weights' in self.config: - weight = self.config['thrash_weights'].get(label, '0.0') - skip = random.randrange(0.0, 1.0) - if weight <= skip: - self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight)) - continue - - self.log('kill {label} (rank={rank})'.format(label=label, rank=rank)) - self.kill_mds(name) - stats['kill'] += 1 - - # wait for mon to report killed mds as crashed - last_laggy_since = None - itercount = 0 - while True: - status = self.fs.status() - info = status.get_mds(name) - if not info: - break - if 'laggy_since' in info: - last_laggy_since = info['laggy_since'] - break - if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]): - break - self.log( - 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format( - label=label)) - itercount = itercount + 1 - if itercount > 10: - self.log('mds map: {status}'.format(status=status)) - sleep(2) - - if last_laggy_since: - self.log( - '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since)) - else: - self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since)) - - # wait for a standby mds to takeover and become active - status = self.wait_for_stable(rank, gid) - - # wait for a while before restarting old active to become new - # standby - delay = self.max_revive_delay - if self.randomize: - delay = random.randrange(0.0, self.max_revive_delay) - - self.log('waiting for {delay} secs before reviving {label}'.format( - delay=delay, label=label)) - sleep(delay) - - self.log('reviving {label}'.format(label=label)) - self.revive_mds(name) - - for itercount in itertools.count(): - if itercount > 300/2: # 5 minutes - raise RuntimeError('timeout waiting for MDS to revive') - status = self.fs.status() - info = status.get_mds(name) - if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'): - self.log('{label} reported in {state} state'.format(label=label, state=info['state'])) - break - self.log( - 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label)) - sleep(2) - - for stat in stats: - self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat])) - - # don't do replay thrashing right now -# for info in status.get_replays(self.fs.id): -# # this might race with replay -> active transition... -# if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay: -# delay = self.max_replay_thrash_delay -# if self.randomize: -# delay = random.randrange(0.0, self.max_replay_thrash_delay) -# sleep(delay) -# self.log('kill replaying mds.{id}'.format(id=self.to_kill)) -# self.kill_mds(self.to_kill) -# -# delay = self.max_revive_delay -# if self.randomize: -# delay = random.randrange(0.0, self.max_revive_delay) -# -# self.log('waiting for {delay} secs before reviving mds.{id}'.format( -# delay=delay, id=self.to_kill)) -# sleep(delay) -# -# self.log('revive mds.{id}'.format(id=self.to_kill)) -# self.revive_mds(self.to_kill) - - -@contextlib.contextmanager -def task(ctx, config): - """ - Stress test the mds by thrashing while another task/workunit - is running. - - Please refer to MDSThrasher class for further information on the - available options. - """ - - mds_cluster = MDSCluster(ctx) - - if config is None: - config = {} - assert isinstance(config, dict), \ - 'mds_thrash task only accepts a dict for configuration' - mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) - assert len(mdslist) > 1, \ - 'mds_thrash task requires at least 2 metadata servers' - - # choose random seed - if 'seed' in config: - seed = int(config['seed']) - else: - seed = int(time.time()) - log.info('mds thrasher using random seed: {seed}'.format(seed=seed)) - random.seed(seed) - - (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys() - manager = ceph_manager.CephManager( - first, ctx=ctx, logger=log.getChild('ceph_manager'), - ) - - # make sure everyone is in active, standby, or standby-replay - log.info('Wait for all MDSs to reach steady state...') - status = mds_cluster.status() - while True: - steady = True - for info in status.get_all(): - state = info['state'] - if state not in ('up:active', 'up:standby', 'up:standby-replay'): - steady = False - break - if steady: - break - sleep(2) - status = mds_cluster.status() - log.info('Ready to start thrashing') - - thrashers = [] - - watchdog = DaemonWatchdog(ctx, manager, config, thrashers) - watchdog.start() - - manager.wait_for_clean() - assert manager.is_clean() - for fs in status.get_filesystems(): - thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']) - thrasher.start() - thrashers.append(thrasher) - - try: - log.debug('Yielding') - yield - finally: - log.info('joining mds_thrashers') - for thrasher in thrashers: - thrasher.stop() - if thrasher.e: - raise RuntimeError('error during thrashing') - thrasher.join() - log.info('done joining') - - watchdog.stop() - watchdog.join()