initial code repo
[stor4nfv.git] / src / ceph / qa / tasks / mds_thrash.py
diff --git a/src/ceph/qa/tasks/mds_thrash.py b/src/ceph/qa/tasks/mds_thrash.py
new file mode 100644 (file)
index 0000000..75d236d
--- /dev/null
@@ -0,0 +1,555 @@
+"""
+Thrash mds by simulating failures
+"""
+import logging
+import contextlib
+import ceph_manager
+import itertools
+import random
+import signal
+import time
+
+from gevent import sleep
+from gevent.greenlet import Greenlet
+from gevent.event import Event
+from teuthology import misc as teuthology
+
+from tasks.cephfs.filesystem import MDSCluster, Filesystem
+
+log = logging.getLogger(__name__)
+
+class DaemonWatchdog(Greenlet):
+    """
+    DaemonWatchdog::
+
+    Watch Ceph daemons for failures. If an extended failure is detected (i.e.
+    not intentional), then the watchdog will unmount file systems and send
+    SIGTERM to all daemons. The duration of an extended failure is configurable
+    with watchdog_daemon_timeout.
+
+    watchdog_daemon_timeout [default: 300]: number of seconds a daemon
+        is allowed to be failed before the watchdog will bark.
+    """
+
+    def __init__(self, ctx, manager, config, thrashers):
+        Greenlet.__init__(self)
+        self.ctx = ctx
+        self.config = config
+        self.e = None
+        self.logger = log.getChild('daemon_watchdog')
+        self.manager = manager
+        self.name = 'watchdog'
+        self.stopping = Event()
+        self.thrashers = thrashers
+
+    def _run(self):
+        try:
+            self.watch()
+        except Exception as e:
+            # See _run exception comment for MDSThrasher
+            self.e = e
+            self.logger.exception("exception:")
+            # allow successful completion so gevent doesn't see an exception...
+
+    def log(self, x):
+        """Write data to logger"""
+        self.logger.info(x)
+
+    def stop(self):
+        self.stopping.set()
+
+    def bark(self):
+        self.log("BARK! unmounting mounts and killing all daemons")
+        for mount in self.ctx.mounts.values():
+            try:
+                mount.umount_wait(force=True)
+            except:
+                self.logger.exception("ignoring exception:")
+        daemons = []
+        daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
+        daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
+        for daemon in daemons:
+            try:
+                daemon.signal(signal.SIGTERM)
+            except:
+                self.logger.exception("ignoring exception:")
+
+    def watch(self):
+        self.log("watchdog starting")
+        daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
+        daemon_failure_time = {}
+        while not self.stopping.is_set():
+            bark = False
+            now = time.time()
+
+            mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
+            mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
+            clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
+
+            #for daemon in mons:
+            #    self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+            #for daemon in mdss:
+            #    self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+
+            daemon_failures = []
+            daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
+            daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
+            for daemon in daemon_failures:
+                name = daemon.role + '.' + daemon.id_
+                dt = daemon_failure_time.setdefault(name, (daemon, now))
+                assert dt[0] is daemon
+                delta = now-dt[1]
+                self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
+                if delta > daemon_timeout:
+                    bark = True
+
+            # If a daemon is no longer failed, remove it from tracking:
+            for name in daemon_failure_time.keys():
+                if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
+                    self.log("daemon {name} has been restored".format(name=name))
+                    del daemon_failure_time[name]
+
+            for thrasher in self.thrashers:
+                if thrasher.e is not None:
+                    self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
+                    bark = True
+
+            if bark:
+                self.bark()
+                return
+
+            sleep(5)
+
+        self.log("watchdog finished")
+
+class MDSThrasher(Greenlet):
+    """
+    MDSThrasher::
+
+    The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
+
+    The config is optional.  Many of the config parameters are a a maximum value
+    to use when selecting a random value from a range.  To always use the maximum
+    value, set no_random to true.  The config is a dict containing some or all of:
+
+    max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
+      any given time.
+
+    max_thrash_delay: [default: 30] maximum number of seconds to delay before
+      thrashing again.
+
+    max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
+      the replay state before thrashing.
+
+    max_revive_delay: [default: 10] maximum number of seconds to delay before
+      bringing back a thrashed MDS.
+
+    randomize: [default: true] enables randomization and use the max/min values
+
+    seed: [no default] seed the random number generator
+
+    thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
+      during replay.  Value should be between 0.0 and 1.0.
+
+    thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
+      cluster will be modified to a value [1, current) or (current, starting
+      max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
+      deactivated to reach the new max_mds.  Value should be between 0.0 and 1.0.
+
+    thrash_while_stopping: [default: false] thrash an MDS while there
+      are MDS in up:stopping (because max_mds was changed and some
+      MDS were deactivated).
+
+    thrash_weights: allows specific MDSs to be thrashed more/less frequently.
+      This option overrides anything specified by max_thrash.  This option is a
+      dict containing mds.x: weight pairs.  For example, [mds.a: 0.7, mds.b:
+      0.3, mds.c: 0.0].  Each weight is a value from 0.0 to 1.0.  Any MDSs not
+      specified will be automatically given a weight of 0.0 (not thrashed).
+      For a given MDS, by default the trasher delays for up to
+      max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
+      If a non-zero weight is specified for an MDS, for each iteration the
+      thrasher chooses whether to thrash during that iteration based on a
+      random value [0-1] not exceeding the weight of that MDS.
+
+    Examples::
+
+
+      The following example sets the likelihood that mds.a will be thrashed
+      to 80%, mds.b to 20%, and other MDSs will not be thrashed.  It also sets the
+      likelihood that an MDS will be thrashed in replay to 40%.
+      Thrash weights do not have to sum to 1.
+
+      tasks:
+      - ceph:
+      - mds_thrash:
+          thrash_weights:
+            - mds.a: 0.8
+            - mds.b: 0.2
+          thrash_in_replay: 0.4
+      - ceph-fuse:
+      - workunit:
+          clients:
+            all: [suites/fsx.sh]
+
+      The following example disables randomization, and uses the max delay values:
+
+      tasks:
+      - ceph:
+      - mds_thrash:
+          max_thrash_delay: 10
+          max_revive_delay: 1
+          max_replay_thrash_delay: 4
+
+    """
+
+    def __init__(self, ctx, manager, config, fs, max_mds):
+        Greenlet.__init__(self)
+
+        self.config = config
+        self.ctx = ctx
+        self.e = None
+        self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
+        self.fs = fs
+        self.manager = manager
+        self.max_mds = max_mds
+        self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
+        self.stopping = Event()
+
+        self.randomize = bool(self.config.get('randomize', True))
+        self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
+        self.max_thrash = int(self.config.get('max_thrash', 1))
+        self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
+        self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
+        assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
+            v=self.thrash_in_replay)
+        self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
+        self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
+
+    def _run(self):
+        try:
+            self.do_thrash()
+        except Exception as e:
+            # Log exceptions here so we get the full backtrace (gevent loses them).
+            # Also allow succesful completion as gevent exception handling is a broken mess:
+            #
+            # 2017-02-03T14:34:01.259 CRITICAL:root:  File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
+            #   File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
+            #     self.print_exception(context, type, value, tb)
+            #   File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
+            #     traceback.print_exception(type, value, tb, file=errstream)
+            #   File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
+            #     _print(file, 'Traceback (most recent call last):')
+            #   File "/usr/lib/python2.7/traceback.py", line 13, in _print
+            #     file.write(str+terminator)
+            # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
+            self.e = e
+            self.logger.exception("exception:")
+            # allow successful completion so gevent doesn't see an exception...
+
+    def log(self, x):
+        """Write data to logger assigned to this MDThrasher"""
+        self.logger.info(x)
+
+    def stop(self):
+        self.stopping.set()
+
+    def kill_mds(self, mds):
+        if self.config.get('powercycle'):
+            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+                         remotes.iterkeys())
+            self.log('kill_mds on mds.{m} doing powercycle of {s}'.
+                     format(m=mds, s=remote.name))
+            self._assert_ipmi(remote)
+            remote.console.power_off()
+        else:
+            self.ctx.daemons.get_daemon('mds', mds).stop()
+
+    @staticmethod
+    def _assert_ipmi(remote):
+        assert remote.console.has_ipmi_credentials, (
+            "powercycling requested but RemoteConsole is not "
+            "initialized.  Check ipmi config.")
+
+    def revive_mds(self, mds, standby_for_rank=None):
+        """
+        Revive mds -- do an ipmpi powercycle (if indicated by the config)
+        and then restart (using --hot-standby if specified.
+        """
+        if self.config.get('powercycle'):
+            (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+                         remotes.iterkeys())
+            self.log('revive_mds on mds.{m} doing powercycle of {s}'.
+                     format(m=mds, s=remote.name))
+            self._assert_ipmi(remote)
+            remote.console.power_on()
+            self.manager.make_admin_daemon_dir(self.ctx, remote)
+        args = []
+        if standby_for_rank:
+            args.extend(['--hot-standby', standby_for_rank])
+        self.ctx.daemons.get_daemon('mds', mds).restart(*args)
+
+    def wait_for_stable(self, rank = None, gid = None):
+        self.log('waiting for mds cluster to stabilize...')
+        for itercount in itertools.count():
+            status = self.fs.status()
+            max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+            ranks = list(status.get_ranks(self.fs.id))
+            stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
+            actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
+
+            if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
+                if itercount % 5 == 0:
+                    self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
+            else:
+                if rank is not None:
+                    try:
+                        info = status.get_rank(self.fs.id, rank)
+                        if info['gid'] != gid and "up:active" == info['state']:
+                            self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
+                            return status
+                    except:
+                        pass # no rank present
+                    if len(actives) >= max_mds:
+                        # no replacement can occur!
+                        self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
+                        return status
+                else:
+                    if len(actives) >= max_mds:
+                        self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
+                        return status, None
+            if itercount > 300/2: # 5 minutes
+                 raise RuntimeError('timeout waiting for cluster to stabilize')
+            elif itercount % 5 == 0:
+                self.log('mds map: {status}'.format(status=status))
+            else:
+                self.log('no change')
+            sleep(2)
+
+    def do_thrash(self):
+        """
+        Perform the random thrashing action
+        """
+
+        self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
+        stats = {
+            "max_mds": 0,
+            "deactivate": 0,
+            "kill": 0,
+        }
+
+        while not self.stopping.is_set():
+            delay = self.max_thrash_delay
+            if self.randomize:
+                delay = random.randrange(0.0, self.max_thrash_delay)
+
+            if delay > 0.0:
+                self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
+                self.stopping.wait(delay)
+                if self.stopping.is_set():
+                    continue
+
+            status = self.fs.status()
+
+            if random.random() <= self.thrash_max_mds:
+                max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+                options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
+                if len(options) > 0:
+                    sample = random.sample(options, 1)
+                    new_max_mds = sample[0]
+                    self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
+                    self.fs.set_max_mds(new_max_mds)
+                    stats['max_mds'] += 1
+
+                    targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id))
+                    if len(targets) > 0:
+                        # deactivate mds in decending order
+                        targets = sorted(targets, key=lambda r: r['rank'], reverse=True)
+                        for target in targets:
+                            self.log("deactivating rank %d" % target['rank'])
+                            self.fs.deactivate(target['rank'])
+                            stats['deactivate'] += 1
+                            status = self.wait_for_stable()[0]
+                    else:
+                        status = self.wait_for_stable()[0]
+
+            count = 0
+            for info in status.get_ranks(self.fs.id):
+                name = info['name']
+                label = 'mds.' + name
+                rank = info['rank']
+                gid = info['gid']
+
+                # if thrash_weights isn't specified and we've reached max_thrash,
+                # we're done
+                count = count + 1
+                if 'thrash_weights' not in self.config and count > self.max_thrash:
+                    break
+
+                weight = 1.0
+                if 'thrash_weights' in self.config:
+                    weight = self.config['thrash_weights'].get(label, '0.0')
+                skip = random.randrange(0.0, 1.0)
+                if weight <= skip:
+                    self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
+                    continue
+
+                self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
+                self.kill_mds(name)
+                stats['kill'] += 1
+
+                # wait for mon to report killed mds as crashed
+                last_laggy_since = None
+                itercount = 0
+                while True:
+                    status = self.fs.status()
+                    info = status.get_mds(name)
+                    if not info:
+                        break
+                    if 'laggy_since' in info:
+                        last_laggy_since = info['laggy_since']
+                        break
+                    if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
+                        break
+                    self.log(
+                        'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
+                            label=label))
+                    itercount = itercount + 1
+                    if itercount > 10:
+                        self.log('mds map: {status}'.format(status=status))
+                    sleep(2)
+
+                if last_laggy_since:
+                    self.log(
+                        '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
+                else:
+                    self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+
+                # wait for a standby mds to takeover and become active
+                status = self.wait_for_stable(rank, gid)
+
+                # wait for a while before restarting old active to become new
+                # standby
+                delay = self.max_revive_delay
+                if self.randomize:
+                    delay = random.randrange(0.0, self.max_revive_delay)
+
+                self.log('waiting for {delay} secs before reviving {label}'.format(
+                    delay=delay, label=label))
+                sleep(delay)
+
+                self.log('reviving {label}'.format(label=label))
+                self.revive_mds(name)
+
+                for itercount in itertools.count():
+                    if itercount > 300/2: # 5 minutes
+                        raise RuntimeError('timeout waiting for MDS to revive')
+                    status = self.fs.status()
+                    info = status.get_mds(name)
+                    if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
+                        self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
+                        break
+                    self.log(
+                        'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
+                    sleep(2)
+
+        for stat in stats:
+            self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
+
+             # don't do replay thrashing right now
+#            for info in status.get_replays(self.fs.id):
+#                # this might race with replay -> active transition...
+#                if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
+#                    delay = self.max_replay_thrash_delay
+#                    if self.randomize:
+#                        delay = random.randrange(0.0, self.max_replay_thrash_delay)
+#                sleep(delay)
+#                self.log('kill replaying mds.{id}'.format(id=self.to_kill))
+#                self.kill_mds(self.to_kill)
+#
+#                delay = self.max_revive_delay
+#                if self.randomize:
+#                    delay = random.randrange(0.0, self.max_revive_delay)
+#
+#                self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+#                    delay=delay, id=self.to_kill))
+#                sleep(delay)
+#
+#                self.log('revive mds.{id}'.format(id=self.to_kill))
+#                self.revive_mds(self.to_kill)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Stress test the mds by thrashing while another task/workunit
+    is running.
+
+    Please refer to MDSThrasher class for further information on the
+    available options.
+    """
+
+    mds_cluster = MDSCluster(ctx)
+
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        'mds_thrash task only accepts a dict for configuration'
+    mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
+    assert len(mdslist) > 1, \
+        'mds_thrash task requires at least 2 metadata servers'
+
+    # choose random seed
+    if 'seed' in config:
+        seed = int(config['seed'])
+    else:
+        seed = int(time.time())
+    log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
+    random.seed(seed)
+
+    (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
+    manager = ceph_manager.CephManager(
+        first, ctx=ctx, logger=log.getChild('ceph_manager'),
+    )
+
+    # make sure everyone is in active, standby, or standby-replay
+    log.info('Wait for all MDSs to reach steady state...')
+    status = mds_cluster.status()
+    while True:
+        steady = True
+        for info in status.get_all():
+            state = info['state']
+            if state not in ('up:active', 'up:standby', 'up:standby-replay'):
+                steady = False
+                break
+        if steady:
+            break
+        sleep(2)
+        status = mds_cluster.status()
+    log.info('Ready to start thrashing')
+
+    thrashers = []
+
+    watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
+    watchdog.start()
+
+    manager.wait_for_clean()
+    assert manager.is_clean()
+    for fs in status.get_filesystems():
+        thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
+        thrasher.start()
+        thrashers.append(thrasher)
+
+    try:
+        log.debug('Yielding')
+        yield
+    finally:
+        log.info('joining mds_thrashers')
+        for thrasher in thrashers:
+            thrasher.stop()
+            if thrasher.e:
+                raise RuntimeError('error during thrashing')
+            thrasher.join()
+        log.info('done joining')
+
+        watchdog.stop()
+        watchdog.join()