Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / mds_thrash.py
1 """
2 Thrash mds by simulating failures
3 """
4 import logging
5 import contextlib
6 import ceph_manager
7 import itertools
8 import random
9 import signal
10 import time
11
12 from gevent import sleep
13 from gevent.greenlet import Greenlet
14 from gevent.event import Event
15 from teuthology import misc as teuthology
16
17 from tasks.cephfs.filesystem import MDSCluster, Filesystem
18
19 log = logging.getLogger(__name__)
20
21 class DaemonWatchdog(Greenlet):
22     """
23     DaemonWatchdog::
24
25     Watch Ceph daemons for failures. If an extended failure is detected (i.e.
26     not intentional), then the watchdog will unmount file systems and send
27     SIGTERM to all daemons. The duration of an extended failure is configurable
28     with watchdog_daemon_timeout.
29
30     watchdog_daemon_timeout [default: 300]: number of seconds a daemon
31         is allowed to be failed before the watchdog will bark.
32     """
33
34     def __init__(self, ctx, manager, config, thrashers):
35         Greenlet.__init__(self)
36         self.ctx = ctx
37         self.config = config
38         self.e = None
39         self.logger = log.getChild('daemon_watchdog')
40         self.manager = manager
41         self.name = 'watchdog'
42         self.stopping = Event()
43         self.thrashers = thrashers
44
45     def _run(self):
46         try:
47             self.watch()
48         except Exception as e:
49             # See _run exception comment for MDSThrasher
50             self.e = e
51             self.logger.exception("exception:")
52             # allow successful completion so gevent doesn't see an exception...
53
54     def log(self, x):
55         """Write data to logger"""
56         self.logger.info(x)
57
58     def stop(self):
59         self.stopping.set()
60
61     def bark(self):
62         self.log("BARK! unmounting mounts and killing all daemons")
63         for mount in self.ctx.mounts.values():
64             try:
65                 mount.umount_wait(force=True)
66             except:
67                 self.logger.exception("ignoring exception:")
68         daemons = []
69         daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
70         daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
71         for daemon in daemons:
72             try:
73                 daemon.signal(signal.SIGTERM)
74             except:
75                 self.logger.exception("ignoring exception:")
76
77     def watch(self):
78         self.log("watchdog starting")
79         daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
80         daemon_failure_time = {}
81         while not self.stopping.is_set():
82             bark = False
83             now = time.time()
84
85             mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
86             mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
87             clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
88
89             #for daemon in mons:
90             #    self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
91             #for daemon in mdss:
92             #    self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
93
94             daemon_failures = []
95             daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
96             daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
97             for daemon in daemon_failures:
98                 name = daemon.role + '.' + daemon.id_
99                 dt = daemon_failure_time.setdefault(name, (daemon, now))
100                 assert dt[0] is daemon
101                 delta = now-dt[1]
102                 self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
103                 if delta > daemon_timeout:
104                     bark = True
105
106             # If a daemon is no longer failed, remove it from tracking:
107             for name in daemon_failure_time.keys():
108                 if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
109                     self.log("daemon {name} has been restored".format(name=name))
110                     del daemon_failure_time[name]
111
112             for thrasher in self.thrashers:
113                 if thrasher.e is not None:
114                     self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
115                     bark = True
116
117             if bark:
118                 self.bark()
119                 return
120
121             sleep(5)
122
123         self.log("watchdog finished")
124
125 class MDSThrasher(Greenlet):
126     """
127     MDSThrasher::
128
129     The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
130
131     The config is optional.  Many of the config parameters are a a maximum value
132     to use when selecting a random value from a range.  To always use the maximum
133     value, set no_random to true.  The config is a dict containing some or all of:
134
135     max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
136       any given time.
137
138     max_thrash_delay: [default: 30] maximum number of seconds to delay before
139       thrashing again.
140
141     max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
142       the replay state before thrashing.
143
144     max_revive_delay: [default: 10] maximum number of seconds to delay before
145       bringing back a thrashed MDS.
146
147     randomize: [default: true] enables randomization and use the max/min values
148
149     seed: [no default] seed the random number generator
150
151     thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
152       during replay.  Value should be between 0.0 and 1.0.
153
154     thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
155       cluster will be modified to a value [1, current) or (current, starting
156       max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
157       deactivated to reach the new max_mds.  Value should be between 0.0 and 1.0.
158
159     thrash_while_stopping: [default: false] thrash an MDS while there
160       are MDS in up:stopping (because max_mds was changed and some
161       MDS were deactivated).
162
163     thrash_weights: allows specific MDSs to be thrashed more/less frequently.
164       This option overrides anything specified by max_thrash.  This option is a
165       dict containing mds.x: weight pairs.  For example, [mds.a: 0.7, mds.b:
166       0.3, mds.c: 0.0].  Each weight is a value from 0.0 to 1.0.  Any MDSs not
167       specified will be automatically given a weight of 0.0 (not thrashed).
168       For a given MDS, by default the trasher delays for up to
169       max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
170       If a non-zero weight is specified for an MDS, for each iteration the
171       thrasher chooses whether to thrash during that iteration based on a
172       random value [0-1] not exceeding the weight of that MDS.
173
174     Examples::
175
176
177       The following example sets the likelihood that mds.a will be thrashed
178       to 80%, mds.b to 20%, and other MDSs will not be thrashed.  It also sets the
179       likelihood that an MDS will be thrashed in replay to 40%.
180       Thrash weights do not have to sum to 1.
181
182       tasks:
183       - ceph:
184       - mds_thrash:
185           thrash_weights:
186             - mds.a: 0.8
187             - mds.b: 0.2
188           thrash_in_replay: 0.4
189       - ceph-fuse:
190       - workunit:
191           clients:
192             all: [suites/fsx.sh]
193
194       The following example disables randomization, and uses the max delay values:
195
196       tasks:
197       - ceph:
198       - mds_thrash:
199           max_thrash_delay: 10
200           max_revive_delay: 1
201           max_replay_thrash_delay: 4
202
203     """
204
205     def __init__(self, ctx, manager, config, fs, max_mds):
206         Greenlet.__init__(self)
207
208         self.config = config
209         self.ctx = ctx
210         self.e = None
211         self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
212         self.fs = fs
213         self.manager = manager
214         self.max_mds = max_mds
215         self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
216         self.stopping = Event()
217
218         self.randomize = bool(self.config.get('randomize', True))
219         self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
220         self.max_thrash = int(self.config.get('max_thrash', 1))
221         self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
222         self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
223         assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
224             v=self.thrash_in_replay)
225         self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
226         self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
227
228     def _run(self):
229         try:
230             self.do_thrash()
231         except Exception as e:
232             # Log exceptions here so we get the full backtrace (gevent loses them).
233             # Also allow succesful completion as gevent exception handling is a broken mess:
234             #
235             # 2017-02-03T14:34:01.259 CRITICAL:root:  File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
236             #   File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
237             #     self.print_exception(context, type, value, tb)
238             #   File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
239             #     traceback.print_exception(type, value, tb, file=errstream)
240             #   File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
241             #     _print(file, 'Traceback (most recent call last):')
242             #   File "/usr/lib/python2.7/traceback.py", line 13, in _print
243             #     file.write(str+terminator)
244             # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
245             self.e = e
246             self.logger.exception("exception:")
247             # allow successful completion so gevent doesn't see an exception...
248
249     def log(self, x):
250         """Write data to logger assigned to this MDThrasher"""
251         self.logger.info(x)
252
253     def stop(self):
254         self.stopping.set()
255
256     def kill_mds(self, mds):
257         if self.config.get('powercycle'):
258             (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
259                          remotes.iterkeys())
260             self.log('kill_mds on mds.{m} doing powercycle of {s}'.
261                      format(m=mds, s=remote.name))
262             self._assert_ipmi(remote)
263             remote.console.power_off()
264         else:
265             self.ctx.daemons.get_daemon('mds', mds).stop()
266
267     @staticmethod
268     def _assert_ipmi(remote):
269         assert remote.console.has_ipmi_credentials, (
270             "powercycling requested but RemoteConsole is not "
271             "initialized.  Check ipmi config.")
272
273     def revive_mds(self, mds, standby_for_rank=None):
274         """
275         Revive mds -- do an ipmpi powercycle (if indicated by the config)
276         and then restart (using --hot-standby if specified.
277         """
278         if self.config.get('powercycle'):
279             (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
280                          remotes.iterkeys())
281             self.log('revive_mds on mds.{m} doing powercycle of {s}'.
282                      format(m=mds, s=remote.name))
283             self._assert_ipmi(remote)
284             remote.console.power_on()
285             self.manager.make_admin_daemon_dir(self.ctx, remote)
286         args = []
287         if standby_for_rank:
288             args.extend(['--hot-standby', standby_for_rank])
289         self.ctx.daemons.get_daemon('mds', mds).restart(*args)
290
291     def wait_for_stable(self, rank = None, gid = None):
292         self.log('waiting for mds cluster to stabilize...')
293         for itercount in itertools.count():
294             status = self.fs.status()
295             max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
296             ranks = list(status.get_ranks(self.fs.id))
297             stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
298             actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
299
300             if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
301                 if itercount % 5 == 0:
302                     self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
303             else:
304                 if rank is not None:
305                     try:
306                         info = status.get_rank(self.fs.id, rank)
307                         if info['gid'] != gid and "up:active" == info['state']:
308                             self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
309                             return status
310                     except:
311                         pass # no rank present
312                     if len(actives) >= max_mds:
313                         # no replacement can occur!
314                         self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
315                         return status
316                 else:
317                     if len(actives) >= max_mds:
318                         self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
319                         return status, None
320             if itercount > 300/2: # 5 minutes
321                  raise RuntimeError('timeout waiting for cluster to stabilize')
322             elif itercount % 5 == 0:
323                 self.log('mds map: {status}'.format(status=status))
324             else:
325                 self.log('no change')
326             sleep(2)
327
328     def do_thrash(self):
329         """
330         Perform the random thrashing action
331         """
332
333         self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
334         stats = {
335             "max_mds": 0,
336             "deactivate": 0,
337             "kill": 0,
338         }
339
340         while not self.stopping.is_set():
341             delay = self.max_thrash_delay
342             if self.randomize:
343                 delay = random.randrange(0.0, self.max_thrash_delay)
344
345             if delay > 0.0:
346                 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
347                 self.stopping.wait(delay)
348                 if self.stopping.is_set():
349                     continue
350
351             status = self.fs.status()
352
353             if random.random() <= self.thrash_max_mds:
354                 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
355                 options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
356                 if len(options) > 0:
357                     sample = random.sample(options, 1)
358                     new_max_mds = sample[0]
359                     self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
360                     self.fs.set_max_mds(new_max_mds)
361                     stats['max_mds'] += 1
362
363                     targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id))
364                     if len(targets) > 0:
365                         # deactivate mds in decending order
366                         targets = sorted(targets, key=lambda r: r['rank'], reverse=True)
367                         for target in targets:
368                             self.log("deactivating rank %d" % target['rank'])
369                             self.fs.deactivate(target['rank'])
370                             stats['deactivate'] += 1
371                             status = self.wait_for_stable()[0]
372                     else:
373                         status = self.wait_for_stable()[0]
374
375             count = 0
376             for info in status.get_ranks(self.fs.id):
377                 name = info['name']
378                 label = 'mds.' + name
379                 rank = info['rank']
380                 gid = info['gid']
381
382                 # if thrash_weights isn't specified and we've reached max_thrash,
383                 # we're done
384                 count = count + 1
385                 if 'thrash_weights' not in self.config and count > self.max_thrash:
386                     break
387
388                 weight = 1.0
389                 if 'thrash_weights' in self.config:
390                     weight = self.config['thrash_weights'].get(label, '0.0')
391                 skip = random.randrange(0.0, 1.0)
392                 if weight <= skip:
393                     self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
394                     continue
395
396                 self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
397                 self.kill_mds(name)
398                 stats['kill'] += 1
399
400                 # wait for mon to report killed mds as crashed
401                 last_laggy_since = None
402                 itercount = 0
403                 while True:
404                     status = self.fs.status()
405                     info = status.get_mds(name)
406                     if not info:
407                         break
408                     if 'laggy_since' in info:
409                         last_laggy_since = info['laggy_since']
410                         break
411                     if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
412                         break
413                     self.log(
414                         'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
415                             label=label))
416                     itercount = itercount + 1
417                     if itercount > 10:
418                         self.log('mds map: {status}'.format(status=status))
419                     sleep(2)
420
421                 if last_laggy_since:
422                     self.log(
423                         '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
424                 else:
425                     self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
426
427                 # wait for a standby mds to takeover and become active
428                 status = self.wait_for_stable(rank, gid)
429
430                 # wait for a while before restarting old active to become new
431                 # standby
432                 delay = self.max_revive_delay
433                 if self.randomize:
434                     delay = random.randrange(0.0, self.max_revive_delay)
435
436                 self.log('waiting for {delay} secs before reviving {label}'.format(
437                     delay=delay, label=label))
438                 sleep(delay)
439
440                 self.log('reviving {label}'.format(label=label))
441                 self.revive_mds(name)
442
443                 for itercount in itertools.count():
444                     if itercount > 300/2: # 5 minutes
445                         raise RuntimeError('timeout waiting for MDS to revive')
446                     status = self.fs.status()
447                     info = status.get_mds(name)
448                     if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
449                         self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
450                         break
451                     self.log(
452                         'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
453                     sleep(2)
454
455         for stat in stats:
456             self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
457
458              # don't do replay thrashing right now
459 #            for info in status.get_replays(self.fs.id):
460 #                # this might race with replay -> active transition...
461 #                if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
462 #                    delay = self.max_replay_thrash_delay
463 #                    if self.randomize:
464 #                        delay = random.randrange(0.0, self.max_replay_thrash_delay)
465 #                sleep(delay)
466 #                self.log('kill replaying mds.{id}'.format(id=self.to_kill))
467 #                self.kill_mds(self.to_kill)
468 #
469 #                delay = self.max_revive_delay
470 #                if self.randomize:
471 #                    delay = random.randrange(0.0, self.max_revive_delay)
472 #
473 #                self.log('waiting for {delay} secs before reviving mds.{id}'.format(
474 #                    delay=delay, id=self.to_kill))
475 #                sleep(delay)
476 #
477 #                self.log('revive mds.{id}'.format(id=self.to_kill))
478 #                self.revive_mds(self.to_kill)
479
480
481 @contextlib.contextmanager
482 def task(ctx, config):
483     """
484     Stress test the mds by thrashing while another task/workunit
485     is running.
486
487     Please refer to MDSThrasher class for further information on the
488     available options.
489     """
490
491     mds_cluster = MDSCluster(ctx)
492
493     if config is None:
494         config = {}
495     assert isinstance(config, dict), \
496         'mds_thrash task only accepts a dict for configuration'
497     mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
498     assert len(mdslist) > 1, \
499         'mds_thrash task requires at least 2 metadata servers'
500
501     # choose random seed
502     if 'seed' in config:
503         seed = int(config['seed'])
504     else:
505         seed = int(time.time())
506     log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
507     random.seed(seed)
508
509     (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
510     manager = ceph_manager.CephManager(
511         first, ctx=ctx, logger=log.getChild('ceph_manager'),
512     )
513
514     # make sure everyone is in active, standby, or standby-replay
515     log.info('Wait for all MDSs to reach steady state...')
516     status = mds_cluster.status()
517     while True:
518         steady = True
519         for info in status.get_all():
520             state = info['state']
521             if state not in ('up:active', 'up:standby', 'up:standby-replay'):
522                 steady = False
523                 break
524         if steady:
525             break
526         sleep(2)
527         status = mds_cluster.status()
528     log.info('Ready to start thrashing')
529
530     thrashers = []
531
532     watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
533     watchdog.start()
534
535     manager.wait_for_clean()
536     assert manager.is_clean()
537     for fs in status.get_filesystems():
538         thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
539         thrasher.start()
540         thrashers.append(thrasher)
541
542     try:
543         log.debug('Yielding')
544         yield
545     finally:
546         log.info('joining mds_thrashers')
547         for thrasher in thrashers:
548             thrasher.stop()
549             if thrasher.e:
550                 raise RuntimeError('error during thrashing')
551             thrasher.join()
552         log.info('done joining')
553
554         watchdog.stop()
555         watchdog.join()