2 ceph manager -- Thrasher and CephManager objects
4 from cStringIO import StringIO
5 from functools import wraps
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
27 from subprocess import DEVNULL # py3k
29 DEVNULL = open(os.devnull, 'r+')
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
33 log = logging.getLogger(__name__)
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
38 ctx.ceph[cluster].conf.write(conf_fp)
40 writes = ctx.cluster.run(
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
50 'sudo', 'chmod', '0644', conf_path,
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
58 def mount_osd_data(ctx, remote, cluster, osd):
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
91 '-o', ','.join(mount_options),
100 Object used to thrash Ceph
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
114 self.revive_timeout = self.config.get("revive_timeout", 360)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 4)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128 self.random_eio = self.config.get('random_eio')
129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self.logger is not None:
134 self.log = lambda x: self.logger.info(x)
138 Implement log behavior
142 if self.config is None:
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self.saved_options = []
147 # assuming that the default settings do not vary from one daemon to
149 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150 opts = [('mon', 'mon_osd_down_out_interval', 0)]
151 for service, opt, new_value in opts:
152 old_value = manager.get_config(first_mon[0],
155 self.saved_options.append((service, opt, old_value))
156 self._set_config(service, '*', opt, new_value)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self.config.get('powercycle') or
160 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self.config.get('disable_objectstore_tool_tests', False)):
162 self.ceph_objectstore_tool = False
163 self.test_rm_past_intervals = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 self.test_rm_past_intervals = \
174 self.config.get('test_rm_past_intervals', True)
176 self.thread = gevent.spawn(self.do_thrash)
177 if self.sighup_delay:
178 self.sighup_thread = gevent.spawn(self.do_sighup)
179 if self.optrack_toggle_delay:
180 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181 if self.dump_ops_enable == "true":
182 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183 if self.noscrub_toggle_delay:
184 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
186 def _set_config(self, service_type, service_id, name, value):
187 opt_arg = '--{name} {value}'.format(name=name, value=value)
188 whom = '.'.join([service_type, service_id])
189 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190 'injectargs', opt_arg)
193 def cmd_exists_on_osds(self, cmd):
194 allremotes = self.ceph_manager.ctx.cluster.only(\
195 teuthology.is_type('osd', self.cluster)).remotes.keys()
196 allremotes = list(set(allremotes))
197 for remote in allremotes:
198 proc = remote.run(args=['type', cmd], wait=True,
199 check_status=False, stdout=StringIO(),
201 if proc.exitstatus != 0:
205 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
212 osd = random.choice(self.live_osds)
213 self.log("Killing osd %s, live_osds are %s" % (str(osd),
214 str(self.live_osds)))
215 self.live_osds.remove(osd)
216 self.dead_osds.append(osd)
217 self.ceph_manager.kill_osd(osd)
219 self.ceph_manager.mark_down_osd(osd)
220 if mark_out and osd in self.in_osds:
222 if self.ceph_objectstore_tool:
223 self.log("Testing ceph-objectstore-tool on down osd")
224 remote = self.ceph_manager.find_remote('osd', osd)
225 FSPATH = self.ceph_manager.get_filepath()
226 JPATH = os.path.join(FSPATH, "journal")
227 exp_osd = imp_osd = osd
228 exp_remote = imp_remote = remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self.dead_osds) > 1 and
231 random.random() < self.chance_move_pg):
232 exp_osd = random.choice(self.dead_osds[:-1])
233 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234 if ('keyvaluestore_backend' in
235 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath=FSPATH, jpath=JPATH))
243 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath=FSPATH, jpath=JPATH))
248 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
254 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
255 wait=True, check_status=False, stdout=StringIO(),
257 if proc.exitstatus == 0:
259 log.debug("ceph-objectstore-tool binary not present, trying again")
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
265 proc = exp_remote.run(args=cmd, wait=True,
267 stdout=StringIO(), stderr=StringIO())
268 if proc.exitstatus == 0:
270 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret=proc.exitstatus))
277 pgs = proc.stdout.getvalue().split('\n')[:-1]
279 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
281 pg = random.choice(pgs)
282 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284 exp_path = os.path.join(exp_path,
285 "exp.{pg}.{id}".format(
289 # Can't use new export-remove op since this is part of upgrade testing
290 cmd = prefix + "--op export --pgid {pg} --file {file}"
291 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
292 proc = exp_remote.run(args=cmd)
294 raise Exception("ceph-objectstore-tool: "
295 "export failure with status {ret}".
296 format(ret=proc.exitstatus))
298 cmd = prefix + "--force --op remove --pgid {pg}"
299 cmd = cmd.format(id=exp_osd, pg=pg)
300 proc = exp_remote.run(args=cmd)
302 raise Exception("ceph-objectstore-tool: "
303 "remove failure with status {ret}".
304 format(ret=proc.exitstatus))
305 # If there are at least 2 dead osds we might move the pg
306 if exp_osd != imp_osd:
307 # If pg isn't already on this osd, then we will move it there
308 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
309 proc = imp_remote.run(args=cmd, wait=True,
310 check_status=False, stdout=StringIO())
312 raise Exception("ceph-objectstore-tool: "
313 "imp list-pgs failure with status {ret}".
314 format(ret=proc.exitstatus))
315 pgs = proc.stdout.getvalue().split('\n')[:-1]
317 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
318 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
319 if imp_remote != exp_remote:
320 # Copy export file to the other machine
321 self.log("Transfer export file from {srem} to {trem}".
322 format(srem=exp_remote, trem=imp_remote))
323 tmpexport = Remote.get_file(exp_remote, exp_path)
324 Remote.put_file(imp_remote, tmpexport, exp_path)
327 # Can't move the pg after all
329 imp_remote = exp_remote
331 cmd = (prefix + "--op import --file {file}")
332 cmd = cmd.format(id=imp_osd, file=exp_path)
333 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
335 if proc.exitstatus == 1:
336 bogosity = "The OSD you are using is older than the exported PG"
337 if bogosity in proc.stderr.getvalue():
338 self.log("OSD older than exported PG"
340 elif proc.exitstatus == 10:
341 self.log("Pool went away before processing an import"
343 elif proc.exitstatus == 11:
344 self.log("Attempt to import an incompatible export"
346 elif proc.exitstatus:
347 raise Exception("ceph-objectstore-tool: "
348 "import failure with status {ret}".
349 format(ret=proc.exitstatus))
350 cmd = "rm -f {file}".format(file=exp_path)
351 exp_remote.run(args=cmd)
352 if imp_remote != exp_remote:
353 imp_remote.run(args=cmd)
355 # apply low split settings to each pool
356 for pool in self.ceph_manager.list_pools():
357 no_sudo_prefix = prefix[5:]
358 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
359 "--filestore-split-multiple 1' sudo -E "
360 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
361 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
362 output = proc.stderr.getvalue()
363 if 'Couldn\'t find pool' in output:
366 raise Exception("ceph-objectstore-tool apply-layout-settings"
367 " failed with {status}".format(status=proc.exitstatus))
369 def rm_past_intervals(self, osd=None):
371 :param osd: Osd to find pg to remove past intervals
373 if self.test_rm_past_intervals:
375 osd = random.choice(self.dead_osds)
376 self.log("Use ceph_objectstore_tool to remove past intervals")
377 remote = self.ceph_manager.find_remote('osd', osd)
378 FSPATH = self.ceph_manager.get_filepath()
379 JPATH = os.path.join(FSPATH, "journal")
380 if ('keyvaluestore_backend' in
381 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
382 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
383 "--data-path {fpath} --journal-path {jpath} "
384 "--type keyvaluestore "
386 "/var/log/ceph/objectstore_tool.\\$pid.log ".
387 format(fpath=FSPATH, jpath=JPATH))
389 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
390 "--data-path {fpath} --journal-path {jpath} "
392 "/var/log/ceph/objectstore_tool.\\$pid.log ".
393 format(fpath=FSPATH, jpath=JPATH))
394 cmd = (prefix + "--op list-pgs").format(id=osd)
395 proc = remote.run(args=cmd, wait=True,
396 check_status=False, stdout=StringIO())
398 raise Exception("ceph_objectstore_tool: "
399 "exp list-pgs failure with status {ret}".
400 format(ret=proc.exitstatus))
401 pgs = proc.stdout.getvalue().split('\n')[:-1]
403 self.log("No PGs found for osd.{osd}".format(osd=osd))
405 pg = random.choice(pgs)
406 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
407 format(id=osd, pg=pg)
408 proc = remote.run(args=cmd)
410 raise Exception("ceph_objectstore_tool: "
411 "rm-past-intervals failure with status {ret}".
412 format(ret=proc.exitstatus))
414 def blackhole_kill_osd(self, osd=None):
416 If all else fails, kill the osd.
417 :param osd: Osd to be killed.
420 osd = random.choice(self.live_osds)
421 self.log("Blackholing and then killing osd %s, live_osds are %s" %
422 (str(osd), str(self.live_osds)))
423 self.live_osds.remove(osd)
424 self.dead_osds.append(osd)
425 self.ceph_manager.blackhole_kill_osd(osd)
427 def revive_osd(self, osd=None, skip_admin_check=False):
430 :param osd: Osd to be revived.
433 osd = random.choice(self.dead_osds)
434 self.log("Reviving osd %s" % (str(osd),))
435 self.ceph_manager.revive_osd(
438 skip_admin_check=skip_admin_check)
439 self.dead_osds.remove(osd)
440 self.live_osds.append(osd)
441 if self.random_eio > 0 and osd is self.rerrosd:
442 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
443 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
444 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
445 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
448 def out_osd(self, osd=None):
451 :param osd: Osd to be marked.
454 osd = random.choice(self.in_osds)
455 self.log("Removing osd %s, in_osds are: %s" %
456 (str(osd), str(self.in_osds)))
457 self.ceph_manager.mark_out_osd(osd)
458 self.in_osds.remove(osd)
459 self.out_osds.append(osd)
461 def in_osd(self, osd=None):
464 :param osd: Osd to be marked.
467 osd = random.choice(self.out_osds)
468 if osd in self.dead_osds:
469 return self.revive_osd(osd)
470 self.log("Adding osd %s" % (str(osd),))
471 self.out_osds.remove(osd)
472 self.in_osds.append(osd)
473 self.ceph_manager.mark_in_osd(osd)
474 self.log("Added osd %s" % (str(osd),))
476 def reweight_osd_or_by_util(self, osd=None):
478 Reweight an osd that is in
479 :param osd: Osd to be marked.
481 if osd is not None or random.choice([True, False]):
483 osd = random.choice(self.in_osds)
484 val = random.uniform(.1, 1.0)
485 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
486 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
489 # do it several times, the option space is large
492 'max_change': random.choice(['0.05', '1.0', '3.0']),
493 'overage': random.choice(['110', '1000']),
494 'type': random.choice([
495 'reweight-by-utilization',
496 'test-reweight-by-utilization']),
498 self.log("Reweighting by: %s"%(str(options),))
499 self.ceph_manager.raw_cluster_cmd(
503 options['max_change'])
505 def primary_affinity(self, osd=None):
507 osd = random.choice(self.in_osds)
508 if random.random() >= .5:
510 elif random.random() >= .5:
514 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
515 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
518 def thrash_cluster_full(self):
520 Set and unset cluster full condition
522 self.log('Setting full ratio to .001')
523 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
525 self.log('Setting full ratio back to .95')
526 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
528 def thrash_pg_upmap(self):
530 Install or remove random pg_upmap entries in OSDMap
532 from random import shuffle
533 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
535 self.log('j is %s' % j)
537 if random.random() >= .3:
538 pgs = self.ceph_manager.get_pg_stats()
539 pg = random.choice(pgs)
540 pgid = str(pg['pgid'])
541 poolid = int(pgid.split('.')[0])
542 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
546 osds = self.in_osds + self.out_osds
549 self.log('Setting %s to %s' % (pgid, osds))
550 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
551 self.log('cmd %s' % cmd)
552 self.ceph_manager.raw_cluster_cmd(*cmd)
558 self.log('Clearing pg_upmap on %s' % pg)
559 self.ceph_manager.raw_cluster_cmd(
564 self.log('No pg_upmap entries; doing nothing')
565 except CommandFailedError:
566 self.log('Failed to rm-pg-upmap, ignoring')
568 def thrash_pg_upmap_items(self):
570 Install or remove random pg_upmap_items entries in OSDMap
572 from random import shuffle
573 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
575 self.log('j is %s' % j)
577 if random.random() >= .3:
578 pgs = self.ceph_manager.get_pg_stats()
579 pg = random.choice(pgs)
580 pgid = str(pg['pgid'])
581 poolid = int(pgid.split('.')[0])
582 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
586 osds = self.in_osds + self.out_osds
589 self.log('Setting %s to %s' % (pgid, osds))
590 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
591 self.log('cmd %s' % cmd)
592 self.ceph_manager.raw_cluster_cmd(*cmd)
594 m = j['pg_upmap_items']
598 self.log('Clearing pg_upmap on %s' % pg)
599 self.ceph_manager.raw_cluster_cmd(
604 self.log('No pg_upmap entries; doing nothing')
605 except CommandFailedError:
606 self.log('Failed to rm-pg-upmap-items, ignoring')
608 def force_recovery(self):
610 Force recovery on some of PGs
612 backfill = random.random() >= 0.5
613 j = self.ceph_manager.get_pgids_to_force(backfill)
616 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
618 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
620 def cancel_force_recovery(self):
622 Force recovery on some of PGs
624 backfill = random.random() >= 0.5
625 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
628 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
630 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
632 def force_cancel_recovery(self):
634 Force or cancel forcing recovery
636 if random.random() >= 0.4:
637 self.force_recovery()
639 self.cancel_force_recovery()
643 Make sure all osds are up and not out.
645 while len(self.dead_osds) > 0:
646 self.log("reviving osd")
648 while len(self.out_osds) > 0:
649 self.log("inning osd")
654 Make sure all osds are up and fully in.
657 for osd in self.live_osds:
658 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
660 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
665 Break out of this Ceph loop
669 if self.sighup_delay:
670 self.log("joining the do_sighup greenlet")
671 self.sighup_thread.get()
672 if self.optrack_toggle_delay:
673 self.log("joining the do_optrack_toggle greenlet")
674 self.optrack_toggle_thread.join()
675 if self.dump_ops_enable == "true":
676 self.log("joining the do_dump_ops greenlet")
677 self.dump_ops_thread.join()
678 if self.noscrub_toggle_delay:
679 self.log("joining the do_noscrub_toggle greenlet")
680 self.noscrub_toggle_thread.join()
684 Increase the size of the pool
686 pool = self.ceph_manager.get_pool()
687 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
688 self.log("Growing pool %s" % (pool,))
689 if self.ceph_manager.expand_pool(pool,
690 self.config.get('pool_grow_by', 10),
692 self.pools_to_fix_pgp_num.add(pool)
694 def fix_pgp_num(self, pool=None):
696 Fix number of pgs in pool.
699 pool = self.ceph_manager.get_pool()
703 self.log("fixing pg num pool %s" % (pool,))
704 if self.ceph_manager.set_pool_pgpnum(pool, force):
705 self.pools_to_fix_pgp_num.discard(pool)
707 def test_pool_min_size(self):
709 Kill and revive all osds except one.
711 self.log("test_pool_min_size")
713 self.ceph_manager.wait_for_recovery(
714 timeout=self.config.get('timeout')
716 the_one = random.choice(self.in_osds)
717 self.log("Killing everyone but %s", the_one)
718 to_kill = filter(lambda x: x != the_one, self.in_osds)
719 [self.kill_osd(i) for i in to_kill]
720 [self.out_osd(i) for i in to_kill]
721 time.sleep(self.config.get("test_pool_min_size_time", 10))
722 self.log("Killing %s" % (the_one,))
723 self.kill_osd(the_one)
724 self.out_osd(the_one)
725 self.log("Reviving everyone but %s" % (the_one,))
726 [self.revive_osd(i) for i in to_kill]
727 [self.in_osd(i) for i in to_kill]
728 self.log("Revived everyone but %s" % (the_one,))
729 self.log("Waiting for clean")
730 self.ceph_manager.wait_for_recovery(
731 timeout=self.config.get('timeout')
734 def inject_pause(self, conf_key, duration, check_after, should_be_down):
736 Pause injection testing. Check for osd being down when finished.
738 the_one = random.choice(self.live_osds)
739 self.log("inject_pause on {osd}".format(osd=the_one))
741 "Testing {key} pause injection for duration {duration}".format(
746 "Checking after {after}, should_be_down={shouldbedown}".format(
748 shouldbedown=should_be_down
750 self.ceph_manager.set_config(the_one, **{conf_key: duration})
751 if not should_be_down:
753 time.sleep(check_after)
754 status = self.ceph_manager.get_osd_status()
755 assert the_one in status['down']
756 time.sleep(duration - check_after + 20)
757 status = self.ceph_manager.get_osd_status()
758 assert not the_one in status['down']
760 def test_backfill_full(self):
762 Test backfills stopping when the replica fills up.
764 First, use injectfull admin command to simulate a now full
765 osd by setting it to 0 on all of the OSDs.
767 Second, on a random subset, set
768 osd_debug_skip_full_check_in_backfill_reservation to force
769 the more complicated check in do_scan to be exercised.
771 Then, verify that all backfillings stop.
773 self.log("injecting backfill full")
774 for i in self.live_osds:
775 self.ceph_manager.set_config(
777 osd_debug_skip_full_check_in_backfill_reservation=
778 random.choice(['false', 'true']))
779 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
780 check_status=True, timeout=30, stdout=DEVNULL)
782 status = self.ceph_manager.compile_pg_status()
783 if 'backfilling' not in status.keys():
786 "waiting for {still_going} backfillings".format(
787 still_going=status.get('backfilling')))
789 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
790 for i in self.live_osds:
791 self.ceph_manager.set_config(
793 osd_debug_skip_full_check_in_backfill_reservation='false')
794 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
795 check_status=True, timeout=30, stdout=DEVNULL)
797 def test_map_discontinuity(self):
799 1) Allows the osds to recover
801 3) allows the remaining osds to recover
802 4) waits for some time
804 This sequence should cause the revived osd to have to handle
805 a map gap since the mons would have trimmed
807 while len(self.in_osds) < (self.minin + 1):
809 self.log("Waiting for recovery")
810 self.ceph_manager.wait_for_all_osds_up(
811 timeout=self.config.get('timeout')
813 # now we wait 20s for the pg status to change, if it takes longer,
814 # the test *should* fail!
816 self.ceph_manager.wait_for_clean(
817 timeout=self.config.get('timeout')
820 # now we wait 20s for the backfill replicas to hear about the clean
822 self.log("Recovered, killing an osd")
823 self.kill_osd(mark_down=True, mark_out=True)
824 self.log("Waiting for clean again")
825 self.ceph_manager.wait_for_clean(
826 timeout=self.config.get('timeout')
828 self.log("Waiting for trim")
829 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
832 def choose_action(self):
834 Random action selector.
836 chance_down = self.config.get('chance_down', 0.4)
837 chance_test_min_size = self.config.get('chance_test_min_size', 0)
838 chance_test_backfill_full = \
839 self.config.get('chance_test_backfill_full', 0)
840 if isinstance(chance_down, int):
841 chance_down = float(chance_down) / 100
843 minout = self.config.get("min_out", 0)
844 minlive = self.config.get("min_live", 2)
845 mindead = self.config.get("min_dead", 0)
847 self.log('choose_action: min_in %d min_out '
848 '%d min_live %d min_dead %d' %
849 (minin, minout, minlive, mindead))
851 if len(self.in_osds) > minin:
852 actions.append((self.out_osd, 1.0,))
853 if len(self.live_osds) > minlive and chance_down > 0:
854 actions.append((self.kill_osd, chance_down,))
855 if len(self.dead_osds) > 1:
856 actions.append((self.rm_past_intervals, 1.0,))
857 if len(self.out_osds) > minout:
858 actions.append((self.in_osd, 1.7,))
859 if len(self.dead_osds) > mindead:
860 actions.append((self.revive_osd, 1.0,))
861 if self.config.get('thrash_primary_affinity', True):
862 actions.append((self.primary_affinity, 1.0,))
863 actions.append((self.reweight_osd_or_by_util,
864 self.config.get('reweight_osd', .5),))
865 actions.append((self.grow_pool,
866 self.config.get('chance_pgnum_grow', 0),))
867 actions.append((self.fix_pgp_num,
868 self.config.get('chance_pgpnum_fix', 0),))
869 actions.append((self.test_pool_min_size,
870 chance_test_min_size,))
871 actions.append((self.test_backfill_full,
872 chance_test_backfill_full,))
873 if self.chance_thrash_cluster_full > 0:
874 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
875 if self.chance_thrash_pg_upmap > 0:
876 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
877 if self.chance_thrash_pg_upmap_items > 0:
878 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
879 if self.chance_force_recovery > 0:
880 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
882 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
885 self.inject_pause(key,
886 self.config.get('pause_short', 3),
889 self.config.get('chance_inject_pause_short', 1),),
891 self.inject_pause(key,
892 self.config.get('pause_long', 80),
893 self.config.get('pause_check_after', 70),
895 self.config.get('chance_inject_pause_long', 0),)]:
896 actions.append(scenario)
898 total = sum([y for (x, y) in actions])
899 val = random.uniform(0, total)
900 for (action, prob) in actions:
912 self.log(traceback.format_exc())
919 Loops and sends signal.SIGHUP to a random live osd.
921 Loop delay is controlled by the config value sighup_delay.
923 delay = float(self.sighup_delay)
924 self.log("starting do_sighup with a delay of {0}".format(delay))
925 while not self.stopping:
926 osd = random.choice(self.live_osds)
927 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
931 def do_optrack_toggle(self):
933 Loops and toggle op tracking to all osds.
935 Loop delay is controlled by the config value optrack_toggle_delay.
937 delay = float(self.optrack_toggle_delay)
939 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
940 while not self.stopping:
941 if osd_state == "true":
945 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
946 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
950 def do_dump_ops(self):
952 Loops and does op dumps on all osds
954 self.log("starting do_dump_ops")
955 while not self.stopping:
956 for osd in self.live_osds:
957 # Ignore errors because live_osds is in flux
958 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
959 check_status=False, timeout=30, stdout=DEVNULL)
960 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
961 check_status=False, timeout=30, stdout=DEVNULL)
962 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
963 check_status=False, timeout=30, stdout=DEVNULL)
967 def do_noscrub_toggle(self):
969 Loops and toggle noscrub flags
971 Loop delay is controlled by the config value noscrub_toggle_delay.
973 delay = float(self.noscrub_toggle_delay)
975 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
976 while not self.stopping:
977 if scrub_state == "none":
978 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
979 scrub_state = "noscrub"
980 elif scrub_state == "noscrub":
981 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
983 elif scrub_state == "both":
984 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
985 scrub_state = "nodeep-scrub"
987 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
990 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
991 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
996 Loop to select random actions to thrash ceph manager with.
998 cleanint = self.config.get("clean_interval", 60)
999 scrubint = self.config.get("scrub_interval", -1)
1000 maxdead = self.config.get("max_dead", 0)
1001 delay = self.config.get("op_delay", 5)
1002 self.rerrosd = self.live_osds[0]
1003 if self.random_eio > 0:
1004 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1005 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1006 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1007 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
1008 self.log("starting do_thrash")
1009 while not self.stopping:
1010 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1011 "out_osds: ", self.out_osds,
1012 "dead_osds: ", self.dead_osds,
1013 "live_osds: ", self.live_osds]]
1014 self.log(" ".join(to_log))
1015 if random.uniform(0, 1) < (float(delay) / cleanint):
1016 while len(self.dead_osds) > maxdead:
1018 for osd in self.in_osds:
1019 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1021 if random.uniform(0, 1) < float(
1022 self.config.get('chance_test_map_discontinuity', 0)):
1023 self.test_map_discontinuity()
1025 self.ceph_manager.wait_for_recovery(
1026 timeout=self.config.get('timeout')
1028 time.sleep(self.clean_wait)
1030 if random.uniform(0, 1) < (float(delay) / scrubint):
1031 self.log('Scrubbing while thrashing being performed')
1032 Scrubber(self.ceph_manager, self.config)
1033 self.choose_action()()
1036 if self.random_eio > 0:
1037 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1038 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1039 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1040 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1041 for pool in list(self.pools_to_fix_pgp_num):
1042 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1043 self.fix_pgp_num(pool)
1044 self.pools_to_fix_pgp_num.clear()
1045 for service, opt, saved_value in self.saved_options:
1046 self._set_config(service, '*', opt, saved_value)
1047 self.saved_options = []
1051 class ObjectStoreTool:
1053 def __init__(self, manager, pool, **kwargs):
1054 self.manager = manager
1056 self.osd = kwargs.get('osd', None)
1057 self.object_name = kwargs.get('object_name', None)
1058 self.do_revive = kwargs.get('do_revive', True)
1059 if self.osd and self.pool and self.object_name:
1060 if self.osd == "primary":
1061 self.osd = self.manager.get_object_primary(self.pool,
1064 if self.object_name:
1065 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1068 self.remote = self.manager.ctx.\
1069 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1070 path = self.manager.get_filepath().format(id=self.osd)
1071 self.paths = ("--data-path {path} --journal-path {path}/journal".
1074 def build_cmd(self, options, args, stdin):
1076 if self.object_name:
1077 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1078 "{paths} --pgid {pgid} --op list |"
1079 "grep '\"oid\":\"{name}\"')".
1080 format(paths=self.paths,
1082 name=self.object_name))
1083 args = '"$object" ' + args
1084 options += " --pgid {pgid}".format(pgid=self.pgid)
1085 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1086 format(paths=self.paths,
1090 cmd = ("echo {payload} | base64 --decode | {cmd}".
1091 format(payload=base64.encode(stdin),
1094 return "\n".join(lines)
1096 def run(self, options, args, stdin=None, stdout=None):
1099 self.manager.kill_osd(self.osd)
1100 cmd = self.build_cmd(options, args, stdin)
1101 self.manager.log(cmd)
1103 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1108 if proc.exitstatus != 0:
1109 self.manager.log("failed with " + str(proc.exitstatus))
1110 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1111 raise Exception(error)
1114 self.manager.revive_osd(self.osd)
1115 self.manager.wait_till_osd_is_up(self.osd, 300)
1120 Ceph manager object.
1121 Contains several local functions that form a bulk of this module.
1123 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1128 ERASURE_CODED_POOL = 3
1130 def __init__(self, controller, ctx=None, config=None, logger=None,
1132 self.lock = threading.RLock()
1134 self.config = config
1135 self.controller = controller
1136 self.next_pool_id = 0
1137 self.cluster = cluster
1139 self.log = lambda x: logger.info(x)
1143 implement log behavior.
1147 if self.config is None:
1148 self.config = dict()
1149 pools = self.list_pools()
1152 # we may race with a pool deletion; ignore failures here
1154 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1155 except CommandFailedError:
1156 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1158 def raw_cluster_cmd(self, *args):
1160 Start ceph on a raw cluster. Return count
1162 testdir = teuthology.get_testdir(self.ctx)
1167 '{tdir}/archive/coverage'.format(tdir=testdir),
1174 ceph_args.extend(args)
1175 proc = self.controller.run(
1179 return proc.stdout.getvalue()
1181 def raw_cluster_cmd_result(self, *args):
1183 Start ceph on a cluster. Return success or failure information.
1185 testdir = teuthology.get_testdir(self.ctx)
1190 '{tdir}/archive/coverage'.format(tdir=testdir),
1197 ceph_args.extend(args)
1198 proc = self.controller.run(
1202 return proc.exitstatus
1204 def run_ceph_w(self):
1206 Execute "ceph -w" in the background with stdout connected to a StringIO,
1207 and return the RemoteProcess.
1209 return self.controller.run(
1217 wait=False, stdout=StringIO(), stdin=run.PIPE)
1219 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1221 Flush pg stats from a list of OSD ids, ensuring they are reflected
1222 all the way to the monitor. Luminous and later only.
1224 :param osds: list of OSDs to flush
1225 :param no_wait: list of OSDs not to wait for seq id. by default, we
1226 wait for all specified osds, but some of them could be
1227 moved out of osdmap, so we cannot get their updated
1228 stat seq from monitor anymore. in that case, you need
1229 to pass a blacklist.
1230 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1231 it. (5 min by default)
1233 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1235 if not wait_for_mon:
1239 for osd, need in seq.iteritems():
1243 while wait_for_mon > 0:
1244 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1245 self.log('need seq {need} got {got} for osd.{osd}'.format(
1246 need=need, got=got, osd=osd))
1251 wait_for_mon -= A_WHILE
1253 raise Exception('timed out waiting for mon to be updated with '
1254 'osd.{osd}: {got} < {need}'.
1255 format(osd=osd, got=got, need=need))
1257 def flush_all_pg_stats(self):
1258 self.flush_pg_stats(range(len(self.get_osd_dump())))
1260 def do_rados(self, remote, cmd, check_status=True):
1262 Execute a remote rados command.
1264 testdir = teuthology.get_testdir(self.ctx)
1268 '{tdir}/archive/coverage'.format(tdir=testdir),
1277 check_status=check_status
1281 def rados_write_objects(self, pool, num_objects, size,
1282 timelimit, threads, cleanup=False):
1285 Threads not used yet.
1289 '--num-objects', num_objects,
1295 args.append('--no-cleanup')
1296 return self.do_rados(self.controller, map(str, args))
1298 def do_put(self, pool, obj, fname, namespace=None):
1300 Implement rados put operation
1303 if namespace is not None:
1304 args += ['-N', namespace]
1310 return self.do_rados(
1316 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1318 Implement rados get operation
1321 if namespace is not None:
1322 args += ['-N', namespace]
1328 return self.do_rados(
1334 def do_rm(self, pool, obj, namespace=None):
1336 Implement rados rm operation
1339 if namespace is not None:
1340 args += ['-N', namespace]
1345 return self.do_rados(
1351 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1354 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1356 def find_remote(self, service_type, service_id):
1358 Get the Remote for the host where a particular service runs.
1360 :param service_type: 'mds', 'osd', 'client'
1361 :param service_id: The second part of a role, e.g. '0' for
1363 :return: a Remote instance for the host where the
1364 requested role is placed
1366 return get_remote(self.ctx, self.cluster,
1367 service_type, service_id)
1369 def admin_socket(self, service_type, service_id,
1370 command, check_status=True, timeout=0, stdout=None):
1372 Remotely start up ceph specifying the admin socket
1373 :param command: a list of words to use as the command
1378 testdir = teuthology.get_testdir(self.ctx)
1379 remote = self.find_remote(service_type, service_id)
1384 '{tdir}/archive/coverage'.format(tdir=testdir),
1391 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1392 cluster=self.cluster,
1396 args.extend(command)
1401 check_status=check_status
1404 def objectstore_tool(self, pool, options, args, **kwargs):
1405 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1407 def get_pgid(self, pool, pgnum):
1409 :param pool: pool name
1410 :param pgnum: pg number
1411 :returns: a string representing this pg.
1413 poolnum = self.get_pool_num(pool)
1414 pg_str = "{poolnum}.{pgnum}".format(
1419 def get_pg_replica(self, pool, pgnum):
1421 get replica for pool, pgnum (e.g. (data, 0)->0
1423 pg_str = self.get_pgid(pool, pgnum)
1424 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1425 j = json.loads('\n'.join(output.split('\n')[1:]))
1426 return int(j['acting'][-1])
1429 def wait_for_pg_stats(func):
1430 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1431 # by default, and take the faulty injection in ms into consideration,
1432 # 12 seconds are more than enough
1433 delays = [1, 1, 2, 3, 5, 8, 13]
1435 def wrapper(self, *args, **kwargs):
1437 for delay in delays:
1439 return func(self, *args, **kwargs)
1440 except AssertionError as e:
1446 def get_pg_primary(self, pool, pgnum):
1448 get primary for pool, pgnum (e.g. (data, 0)->0
1450 pg_str = self.get_pgid(pool, pgnum)
1451 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1452 j = json.loads('\n'.join(output.split('\n')[1:]))
1453 return int(j['acting'][0])
1456 def get_pool_num(self, pool):
1458 get number for pool (e.g., data -> 2)
1460 return int(self.get_pool_dump(pool)['pool'])
1462 def list_pools(self):
1466 osd_dump = self.get_osd_dump_json()
1467 self.log(osd_dump['pools'])
1468 return [str(i['pool_name']) for i in osd_dump['pools']]
1470 def clear_pools(self):
1474 [self.remove_pool(i) for i in self.list_pools()]
1476 def kick_recovery_wq(self, osdnum):
1478 Run kick_recovery_wq on cluster.
1480 return self.raw_cluster_cmd(
1481 'tell', "osd.%d" % (int(osdnum),),
1486 def wait_run_admin_socket(self, service_type,
1487 service_id, args=['version'], timeout=75, stdout=None):
1489 If osd_admin_socket call suceeds, return. Otherwise wait
1490 five seconds and try again.
1496 proc = self.admin_socket(service_type, service_id,
1497 args, check_status=False, stdout=stdout)
1498 if proc.exitstatus is 0:
1502 if (tries * 5) > timeout:
1503 raise Exception('timed out waiting for admin_socket '
1504 'to appear after {type}.{id} restart'.
1505 format(type=service_type,
1507 self.log("waiting on admin_socket for {type}-{id}, "
1508 "{command}".format(type=service_type,
1513 def get_pool_dump(self, pool):
1515 get the osd dump part of a pool
1517 osd_dump = self.get_osd_dump_json()
1518 for i in osd_dump['pools']:
1519 if i['pool_name'] == pool:
1523 def get_config(self, service_type, service_id, name):
1525 :param node: like 'mon.a'
1526 :param name: the option name
1528 proc = self.wait_run_admin_socket(service_type, service_id,
1530 j = json.loads(proc.stdout.getvalue())
1533 def set_config(self, osdnum, **argdict):
1535 :param osdnum: osd number
1536 :param argdict: dictionary containing values to set.
1538 for k, v in argdict.iteritems():
1539 self.wait_run_admin_socket(
1541 ['config', 'set', str(k), str(v)])
1543 def raw_cluster_status(self):
1545 Get status from cluster
1547 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1548 return json.loads(status)
1550 def raw_osd_status(self):
1552 Get osd status from cluster
1554 return self.raw_cluster_cmd('osd', 'dump')
1556 def get_osd_status(self):
1558 Get osd statuses sorted by states that the osds are in.
1561 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1562 self.raw_osd_status().split('\n'))
1564 in_osds = [int(i[4:].split()[0])
1565 for i in filter(lambda x: " in " in x, osd_lines)]
1566 out_osds = [int(i[4:].split()[0])
1567 for i in filter(lambda x: " out " in x, osd_lines)]
1568 up_osds = [int(i[4:].split()[0])
1569 for i in filter(lambda x: " up " in x, osd_lines)]
1570 down_osds = [int(i[4:].split()[0])
1571 for i in filter(lambda x: " down " in x, osd_lines)]
1572 dead_osds = [int(x.id_)
1573 for x in filter(lambda x:
1576 iter_daemons_of_role('osd', self.cluster))]
1577 live_osds = [int(x.id_) for x in
1580 self.ctx.daemons.iter_daemons_of_role('osd',
1582 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1583 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1586 def get_num_pgs(self):
1588 Check cluster status for the number of pgs
1590 status = self.raw_cluster_status()
1592 return status['pgmap']['num_pgs']
1594 def create_erasure_code_profile(self, profile_name, profile):
1596 Create an erasure code profile name that can be used as a parameter
1597 when creating an erasure coded pool.
1600 args = cmd_erasure_code_profile(profile_name, profile)
1601 self.raw_cluster_cmd(*args)
1603 def create_pool_with_unique_name(self, pg_num=16,
1604 erasure_code_profile_name=None,
1606 erasure_code_use_overwrites=False):
1608 Create a pool named unique_pool_X where X is unique.
1612 name = "unique_pool_%s" % (str(self.next_pool_id),)
1613 self.next_pool_id += 1
1617 erasure_code_profile_name=erasure_code_profile_name,
1619 erasure_code_use_overwrites=erasure_code_use_overwrites)
1622 @contextlib.contextmanager
1623 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1624 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1626 self.remove_pool(pool_name)
1628 def create_pool(self, pool_name, pg_num=16,
1629 erasure_code_profile_name=None,
1631 erasure_code_use_overwrites=False):
1633 Create a pool named from the pool_name parameter.
1634 :param pool_name: name of the pool being created.
1635 :param pg_num: initial number of pgs.
1636 :param erasure_code_profile_name: if set and !None create an
1637 erasure coded pool using the profile
1638 :param erasure_code_use_overwrites: if true, allow overwrites
1641 assert isinstance(pool_name, basestring)
1642 assert isinstance(pg_num, int)
1643 assert pool_name not in self.pools
1644 self.log("creating pool_name %s" % (pool_name,))
1645 if erasure_code_profile_name:
1646 self.raw_cluster_cmd('osd', 'pool', 'create',
1647 pool_name, str(pg_num), str(pg_num),
1648 'erasure', erasure_code_profile_name)
1650 self.raw_cluster_cmd('osd', 'pool', 'create',
1651 pool_name, str(pg_num))
1652 if min_size is not None:
1653 self.raw_cluster_cmd(
1654 'osd', 'pool', 'set', pool_name,
1657 if erasure_code_use_overwrites:
1658 self.raw_cluster_cmd(
1659 'osd', 'pool', 'set', pool_name,
1660 'allow_ec_overwrites',
1662 self.raw_cluster_cmd(
1663 'osd', 'pool', 'application', 'enable',
1664 pool_name, 'rados', '--yes-i-really-mean-it',
1665 run.Raw('||'), 'true')
1666 self.pools[pool_name] = pg_num
1669 def add_pool_snap(self, pool_name, snap_name):
1672 :param pool_name: name of pool to snapshot
1673 :param snap_name: name of snapshot to take
1675 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1676 str(pool_name), str(snap_name))
1678 def remove_pool_snap(self, pool_name, snap_name):
1680 Remove pool snapshot
1681 :param pool_name: name of pool to snapshot
1682 :param snap_name: name of snapshot to remove
1684 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1685 str(pool_name), str(snap_name))
1687 def remove_pool(self, pool_name):
1689 Remove the indicated pool
1690 :param pool_name: Pool to be removed
1693 assert isinstance(pool_name, basestring)
1694 assert pool_name in self.pools
1695 self.log("removing pool_name %s" % (pool_name,))
1696 del self.pools[pool_name]
1697 self.do_rados(self.controller,
1698 ['rmpool', pool_name, pool_name,
1699 "--yes-i-really-really-mean-it"])
1706 return random.choice(self.pools.keys())
1708 def get_pool_pg_num(self, pool_name):
1710 Return the number of pgs in the pool specified.
1713 assert isinstance(pool_name, basestring)
1714 if pool_name in self.pools:
1715 return self.pools[pool_name]
1718 def get_pool_property(self, pool_name, prop):
1720 :param pool_name: pool
1721 :param prop: property to be checked.
1722 :returns: property as an int value.
1725 assert isinstance(pool_name, basestring)
1726 assert isinstance(prop, basestring)
1727 output = self.raw_cluster_cmd(
1733 return int(output.split()[1])
1735 def set_pool_property(self, pool_name, prop, val):
1737 :param pool_name: pool
1738 :param prop: property to be set.
1739 :param val: value to set.
1741 This routine retries if set operation fails.
1744 assert isinstance(pool_name, basestring)
1745 assert isinstance(prop, basestring)
1746 assert isinstance(val, int)
1749 r = self.raw_cluster_cmd_result(
1756 if r != 11: # EAGAIN
1760 raise Exception('timed out getting EAGAIN '
1761 'when setting pool property %s %s = %s' %
1762 (pool_name, prop, val))
1763 self.log('got EAGAIN setting pool property, '
1764 'waiting a few seconds...')
1767 def expand_pool(self, pool_name, by, max_pgs):
1769 Increase the number of pgs in a pool
1772 assert isinstance(pool_name, basestring)
1773 assert isinstance(by, int)
1774 assert pool_name in self.pools
1775 if self.get_num_creating() > 0:
1777 if (self.pools[pool_name] + by) > max_pgs:
1779 self.log("increase pool size by %d" % (by,))
1780 new_pg_num = self.pools[pool_name] + by
1781 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1782 self.pools[pool_name] = new_pg_num
1785 def set_pool_pgpnum(self, pool_name, force):
1787 Set pgpnum property of pool_name pool.
1790 assert isinstance(pool_name, basestring)
1791 assert pool_name in self.pools
1792 if not force and self.get_num_creating() > 0:
1794 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1797 def list_pg_missing(self, pgid):
1799 return list of missing pgs with the id specified
1804 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1810 r['objects'].extend(j['objects'])
1815 offset = j['objects'][-1]['oid']
1820 def get_pg_stats(self):
1822 Dump the cluster and get pg stats
1824 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1825 j = json.loads('\n'.join(out.split('\n')[1:]))
1826 return j['pg_stats']
1828 def get_pgids_to_force(self, backfill):
1830 Return the randomized list of PGs that can have their recovery/backfill forced
1832 j = self.get_pg_stats();
1835 wanted = ['degraded', 'backfilling', 'backfill_wait']
1837 wanted = ['recovering', 'degraded', 'recovery_wait']
1839 status = pg['state'].split('+')
1841 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1842 pgids.append(pg['pgid'])
1846 def get_pgids_to_cancel_force(self, backfill):
1848 Return the randomized list of PGs whose recovery/backfill priority is forced
1850 j = self.get_pg_stats();
1853 wanted = 'forced_backfill'
1855 wanted = 'forced_recovery'
1857 status = pg['state'].split('+')
1858 if wanted in status and random.random() > 0.5:
1859 pgids.append(pg['pgid'])
1862 def compile_pg_status(self):
1864 Return a histogram of pg state values
1867 j = self.get_pg_stats()
1869 for status in pg['state'].split('+'):
1870 if status not in ret:
1876 def with_pg_state(self, pool, pgnum, check):
1877 pgstr = self.get_pgid(pool, pgnum)
1878 stats = self.get_single_pg_stats(pgstr)
1879 assert(check(stats['state']))
1882 def with_pg(self, pool, pgnum, check):
1883 pgstr = self.get_pgid(pool, pgnum)
1884 stats = self.get_single_pg_stats(pgstr)
1887 def get_last_scrub_stamp(self, pool, pgnum):
1889 Get the timestamp of the last scrub.
1891 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1892 return stats["last_scrub_stamp"]
1894 def do_pg_scrub(self, pool, pgnum, stype):
1896 Scrub pg and wait for scrubbing to finish
1898 init = self.get_last_scrub_stamp(pool, pgnum)
1899 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1900 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1903 while init == self.get_last_scrub_stamp(pool, pgnum):
1904 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1905 self.log("waiting for scrub type %s" % (stype,))
1906 if (timer % RESEND_TIMEOUT) == 0:
1907 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1908 # The first time in this loop is the actual request
1909 if timer != 0 and stype == "repair":
1910 self.log("WARNING: Resubmitted a non-idempotent repair")
1911 time.sleep(SLEEP_TIME)
1914 def wait_snap_trimming_complete(self, pool):
1916 Wait for snap trimming on pool to end
1921 poolnum = self.get_pool_num(pool)
1922 poolnumstr = "%s." % (poolnum,)
1925 if (now - start) > FATAL_TIMEOUT:
1926 assert (now - start) < FATAL_TIMEOUT, \
1927 'failed to complete snap trimming before timeout'
1928 all_stats = self.get_pg_stats()
1930 for pg in all_stats:
1931 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1932 self.log("pg {pg} in trimming, state: {state}".format(
1938 self.log("{pool} still trimming, waiting".format(pool=pool))
1939 time.sleep(POLL_PERIOD)
1941 def get_single_pg_stats(self, pgid):
1943 Return pg for the pgid specified.
1945 all_stats = self.get_pg_stats()
1947 for pg in all_stats:
1948 if pg['pgid'] == pgid:
1953 def get_object_pg_with_shard(self, pool, name, osdid):
1956 pool_dump = self.get_pool_dump(pool)
1957 object_map = self.get_object_map(pool, name)
1958 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1959 shard = object_map['acting'].index(osdid)
1960 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1963 return object_map['pgid']
1965 def get_object_primary(self, pool, name):
1968 object_map = self.get_object_map(pool, name)
1969 return object_map['acting_primary']
1971 def get_object_map(self, pool, name):
1973 osd map --format=json converted to a python object
1974 :returns: the python object
1976 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1977 return json.loads('\n'.join(out.split('\n')[1:]))
1979 def get_osd_dump_json(self):
1981 osd dump --format=json converted to a python object
1982 :returns: the python object
1984 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1985 return json.loads('\n'.join(out.split('\n')[1:]))
1987 def get_osd_dump(self):
1992 return self.get_osd_dump_json()['osds']
1994 def get_mgr_dump(self):
1995 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
1996 return json.loads(out)
1998 def get_stuck_pgs(self, type_, threshold):
2000 :returns: stuck pg information from the cluster
2002 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2004 return json.loads(out)
2006 def get_num_unfound_objects(self):
2008 Check cluster status to get the number of unfound objects
2010 status = self.raw_cluster_status()
2012 return status['pgmap'].get('unfound_objects', 0)
2014 def get_num_creating(self):
2016 Find the number of pgs in creating mode.
2018 pgs = self.get_pg_stats()
2021 if 'creating' in pg['state']:
2025 def get_num_active_clean(self):
2027 Find the number of active and clean pgs.
2029 pgs = self.get_pg_stats()
2032 if (pg['state'].count('active') and
2033 pg['state'].count('clean') and
2034 not pg['state'].count('stale')):
2038 def get_num_active_recovered(self):
2040 Find the number of active and recovered pgs.
2042 pgs = self.get_pg_stats()
2045 if (pg['state'].count('active') and
2046 not pg['state'].count('recover') and
2047 not pg['state'].count('backfilling') and
2048 not pg['state'].count('stale')):
2052 def get_is_making_recovery_progress(self):
2054 Return whether there is recovery progress discernable in the
2057 status = self.raw_cluster_status()
2058 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2059 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2060 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2061 return kps > 0 or bps > 0 or ops > 0
2063 def get_num_active(self):
2065 Find the number of active pgs.
2067 pgs = self.get_pg_stats()
2070 if pg['state'].count('active') and not pg['state'].count('stale'):
2074 def get_num_down(self):
2076 Find the number of pgs that are down.
2078 pgs = self.get_pg_stats()
2081 if ((pg['state'].count('down') and not
2082 pg['state'].count('stale')) or
2083 (pg['state'].count('incomplete') and not
2084 pg['state'].count('stale'))):
2088 def get_num_active_down(self):
2090 Find the number of pgs that are either active or down.
2092 pgs = self.get_pg_stats()
2095 if ((pg['state'].count('active') and not
2096 pg['state'].count('stale')) or
2097 (pg['state'].count('down') and not
2098 pg['state'].count('stale')) or
2099 (pg['state'].count('incomplete') and not
2100 pg['state'].count('stale'))):
2106 True if all pgs are clean
2108 return self.get_num_active_clean() == self.get_num_pgs()
2110 def is_recovered(self):
2112 True if all pgs have recovered
2114 return self.get_num_active_recovered() == self.get_num_pgs()
2116 def is_active_or_down(self):
2118 True if all pgs are active or down
2120 return self.get_num_active_down() == self.get_num_pgs()
2122 def wait_for_clean(self, timeout=None):
2124 Returns true when all pgs are clean.
2126 self.log("waiting for clean")
2128 num_active_clean = self.get_num_active_clean()
2129 while not self.is_clean():
2130 if timeout is not None:
2131 if self.get_is_making_recovery_progress():
2132 self.log("making progress, resetting timeout")
2135 self.log("no progress seen, keeping timeout for now")
2136 if time.time() - start >= timeout:
2137 self.log('dumping pgs')
2138 out = self.raw_cluster_cmd('pg', 'dump')
2140 assert time.time() - start < timeout, \
2141 'failed to become clean before timeout expired'
2142 cur_active_clean = self.get_num_active_clean()
2143 if cur_active_clean != num_active_clean:
2145 num_active_clean = cur_active_clean
2149 def are_all_osds_up(self):
2151 Returns true if all osds are up.
2153 x = self.get_osd_dump()
2154 return (len(x) == sum([(y['up'] > 0) for y in x]))
2156 def wait_for_all_osds_up(self, timeout=None):
2158 When this exits, either the timeout has expired, or all
2161 self.log("waiting for all up")
2163 while not self.are_all_osds_up():
2164 if timeout is not None:
2165 assert time.time() - start < timeout, \
2166 'timeout expired in wait_for_all_osds_up'
2170 def pool_exists(self, pool):
2171 if pool in self.list_pools():
2175 def wait_for_pool(self, pool, timeout=300):
2177 Wait for a pool to exist
2179 self.log('waiting for pool %s to exist' % pool)
2181 while not self.pool_exists(pool):
2182 if timeout is not None:
2183 assert time.time() - start < timeout, \
2184 'timeout expired in wait_for_pool'
2187 def wait_for_pools(self, pools):
2189 self.wait_for_pool(pool)
2191 def is_mgr_available(self):
2192 x = self.get_mgr_dump()
2193 return x.get('available', False)
2195 def wait_for_mgr_available(self, timeout=None):
2196 self.log("waiting for mgr available")
2198 while not self.is_mgr_available():
2199 if timeout is not None:
2200 assert time.time() - start < timeout, \
2201 'timeout expired in wait_for_mgr_available'
2203 self.log("mgr available!")
2205 def wait_for_recovery(self, timeout=None):
2207 Check peering. When this exists, we have recovered.
2209 self.log("waiting for recovery to complete")
2211 num_active_recovered = self.get_num_active_recovered()
2212 while not self.is_recovered():
2214 if timeout is not None:
2215 if self.get_is_making_recovery_progress():
2216 self.log("making progress, resetting timeout")
2219 self.log("no progress seen, keeping timeout for now")
2220 if now - start >= timeout:
2221 if self.is_recovered():
2223 self.log('dumping pgs')
2224 out = self.raw_cluster_cmd('pg', 'dump')
2226 assert now - start < timeout, \
2227 'failed to recover before timeout expired'
2228 cur_active_recovered = self.get_num_active_recovered()
2229 if cur_active_recovered != num_active_recovered:
2231 num_active_recovered = cur_active_recovered
2233 self.log("recovered!")
2235 def wait_for_active(self, timeout=None):
2237 Check peering. When this exists, we are definitely active
2239 self.log("waiting for peering to complete")
2241 num_active = self.get_num_active()
2242 while not self.is_active():
2243 if timeout is not None:
2244 if time.time() - start >= timeout:
2245 self.log('dumping pgs')
2246 out = self.raw_cluster_cmd('pg', 'dump')
2248 assert time.time() - start < timeout, \
2249 'failed to recover before timeout expired'
2250 cur_active = self.get_num_active()
2251 if cur_active != num_active:
2253 num_active = cur_active
2257 def wait_for_active_or_down(self, timeout=None):
2259 Check peering. When this exists, we are definitely either
2262 self.log("waiting for peering to complete or become blocked")
2264 num_active_down = self.get_num_active_down()
2265 while not self.is_active_or_down():
2266 if timeout is not None:
2267 if time.time() - start >= timeout:
2268 self.log('dumping pgs')
2269 out = self.raw_cluster_cmd('pg', 'dump')
2271 assert time.time() - start < timeout, \
2272 'failed to recover before timeout expired'
2273 cur_active_down = self.get_num_active_down()
2274 if cur_active_down != num_active_down:
2276 num_active_down = cur_active_down
2278 self.log("active or down!")
2280 def osd_is_up(self, osd):
2282 Wrapper for osd check
2284 osds = self.get_osd_dump()
2285 return osds[osd]['up'] > 0
2287 def wait_till_osd_is_up(self, osd, timeout=None):
2289 Loop waiting for osd.
2291 self.log('waiting for osd.%d to be up' % osd)
2293 while not self.osd_is_up(osd):
2294 if timeout is not None:
2295 assert time.time() - start < timeout, \
2296 'osd.%d failed to come up before timeout expired' % osd
2298 self.log('osd.%d is up' % osd)
2300 def is_active(self):
2302 Wrapper to check if all pgs are active
2304 return self.get_num_active() == self.get_num_pgs()
2306 def wait_till_active(self, timeout=None):
2308 Wait until all pgs are active.
2310 self.log("waiting till active")
2312 while not self.is_active():
2313 if timeout is not None:
2314 if time.time() - start >= timeout:
2315 self.log('dumping pgs')
2316 out = self.raw_cluster_cmd('pg', 'dump')
2318 assert time.time() - start < timeout, \
2319 'failed to become active before timeout expired'
2323 def wait_till_pg_convergence(self, timeout=None):
2326 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2327 if osd['in'] and osd['up']]
2329 # strictly speaking, no need to wait for mon. but due to the
2330 # "ms inject socket failures" setting, the osdmap could be delayed,
2331 # so mgr is likely to ignore the pg-stat messages with pgs serving
2332 # newly created pools which is not yet known by mgr. so, to make sure
2333 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2335 self.flush_pg_stats(active_osds)
2336 new_stats = dict((stat['pgid'], stat['state'])
2337 for stat in self.get_pg_stats())
2338 if old_stats == new_stats:
2340 if timeout is not None:
2341 assert time.time() - start < timeout, \
2342 'failed to reach convergence before %d secs' % timeout
2343 old_stats = new_stats
2344 # longer than mgr_stats_period
2347 def mark_out_osd(self, osd):
2349 Wrapper to mark osd out.
2351 self.raw_cluster_cmd('osd', 'out', str(osd))
2353 def kill_osd(self, osd):
2355 Kill osds by either power cycling (if indicated by the config)
2358 if self.config.get('powercycle'):
2359 remote = self.find_remote('osd', osd)
2360 self.log('kill_osd on osd.{o} '
2361 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2362 self._assert_ipmi(remote)
2363 remote.console.power_off()
2364 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2365 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2366 self.raw_cluster_cmd(
2367 '--', 'tell', 'osd.%d' % osd,
2369 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2372 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2376 raise RuntimeError('osd.%s did not fail' % osd)
2378 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2380 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2383 def _assert_ipmi(remote):
2384 assert remote.console.has_ipmi_credentials, (
2385 "powercycling requested but RemoteConsole is not "
2386 "initialized. Check ipmi config.")
2388 def blackhole_kill_osd(self, osd):
2390 Stop osd if nothing else works.
2392 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2394 '--objectstore-blackhole')
2396 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2398 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2400 Revive osds by either power cycling (if indicated by the config)
2403 if self.config.get('powercycle'):
2404 remote = self.find_remote('osd', osd)
2405 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2406 format(o=osd, s=remote.name))
2407 self._assert_ipmi(remote)
2408 remote.console.power_on()
2409 if not remote.console.check_status(300):
2410 raise Exception('Failed to revive osd.{o} via ipmi'.
2412 teuthology.reconnect(self.ctx, 60, [remote])
2413 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2414 self.make_admin_daemon_dir(remote)
2415 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2416 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2418 if not skip_admin_check:
2419 # wait for dump_ops_in_flight; this command doesn't appear
2420 # until after the signal handler is installed and it is safe
2421 # to stop the osd again without making valgrind leak checks
2422 # unhappy. see #5924.
2423 self.wait_run_admin_socket('osd', osd,
2424 args=['dump_ops_in_flight'],
2425 timeout=timeout, stdout=DEVNULL)
2427 def mark_down_osd(self, osd):
2429 Cluster command wrapper
2431 self.raw_cluster_cmd('osd', 'down', str(osd))
2433 def mark_in_osd(self, osd):
2435 Cluster command wrapper
2437 self.raw_cluster_cmd('osd', 'in', str(osd))
2439 def signal_osd(self, osd, sig, silent=False):
2441 Wrapper to local get_daemon call which sends the given
2442 signal to the given osd.
2444 self.ctx.daemons.get_daemon('osd', osd,
2445 self.cluster).signal(sig, silent=silent)
2448 def signal_mon(self, mon, sig, silent=False):
2450 Wrapper to local get_deamon call
2452 self.ctx.daemons.get_daemon('mon', mon,
2453 self.cluster).signal(sig, silent=silent)
2455 def kill_mon(self, mon):
2457 Kill the monitor by either power cycling (if the config says so),
2460 if self.config.get('powercycle'):
2461 remote = self.find_remote('mon', mon)
2462 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2463 format(m=mon, s=remote.name))
2464 self._assert_ipmi(remote)
2465 remote.console.power_off()
2467 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2469 def revive_mon(self, mon):
2471 Restart by either power cycling (if the config says so),
2472 or by doing a normal restart.
2474 if self.config.get('powercycle'):
2475 remote = self.find_remote('mon', mon)
2476 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2477 format(m=mon, s=remote.name))
2478 self._assert_ipmi(remote)
2479 remote.console.power_on()
2480 self.make_admin_daemon_dir(remote)
2481 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2483 def revive_mgr(self, mgr):
2485 Restart by either power cycling (if the config says so),
2486 or by doing a normal restart.
2488 if self.config.get('powercycle'):
2489 remote = self.find_remote('mgr', mgr)
2490 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2491 format(m=mgr, s=remote.name))
2492 self._assert_ipmi(remote)
2493 remote.console.power_on()
2494 self.make_admin_daemon_dir(remote)
2495 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2497 def get_mon_status(self, mon):
2499 Extract all the monitor status information from the cluster
2501 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2502 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2503 return json.loads(out)
2505 def get_mon_quorum(self):
2507 Extract monitor quorum information from the cluster
2509 out = self.raw_cluster_cmd('quorum_status')
2511 self.log('quorum_status is %s' % out)
2514 def wait_for_mon_quorum_size(self, size, timeout=300):
2516 Loop until quorum size is reached.
2518 self.log('waiting for quorum size %d' % size)
2520 while not len(self.get_mon_quorum()) == size:
2521 if timeout is not None:
2522 assert time.time() - start < timeout, \
2523 ('failed to reach quorum size %d '
2524 'before timeout expired' % size)
2526 self.log("quorum is size %d" % size)
2528 def get_mon_health(self, debug=False):
2530 Extract all the monitor health information.
2532 out = self.raw_cluster_cmd('health', '--format=json')
2534 self.log('health:\n{h}'.format(h=out))
2535 return json.loads(out)
2537 def get_mds_status(self, mds):
2539 Run cluster commands for the mds in order to get mds information
2541 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2542 j = json.loads(' '.join(out.splitlines()[1:]))
2543 # collate; for dup ids, larger gid wins.
2544 for info in j['info'].itervalues():
2545 if info['name'] == mds:
2549 def get_filepath(self):
2551 Return path to osd data with {id} needing to be replaced
2553 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2555 def make_admin_daemon_dir(self, remote):
2557 Create /var/run/ceph directory on remote site.
2560 :param remote: Remote site
2562 remote.run(args=['sudo',
2563 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2566 def utility_task(name):
2568 Generate ceph_manager subtask corresponding to ceph_manager
2571 def task(ctx, config):
2574 args = config.get('args', [])
2575 kwargs = config.get('kwargs', {})
2576 cluster = config.get('cluster', 'ceph')
2577 fn = getattr(ctx.managers[cluster], name)
2581 revive_osd = utility_task("revive_osd")
2582 revive_mon = utility_task("revive_mon")
2583 kill_osd = utility_task("kill_osd")
2584 kill_mon = utility_task("kill_mon")
2585 create_pool = utility_task("create_pool")
2586 remove_pool = utility_task("remove_pool")
2587 wait_for_clean = utility_task("wait_for_clean")
2588 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2589 set_pool_property = utility_task("set_pool_property")
2590 do_pg_scrub = utility_task("do_pg_scrub")
2591 wait_for_pool = utility_task("wait_for_pool")
2592 wait_for_pools = utility_task("wait_for_pools")