Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from cStringIO import StringIO
5 from functools import wraps
6 import contextlib
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
25
26 try:
27     from subprocess import DEVNULL # py3k
28 except ImportError:
29     DEVNULL = open(os.devnull, 'r+')
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37     conf_fp = StringIO()
38     ctx.ceph[cluster].conf.write(conf_fp)
39     conf_fp.seek(0)
40     writes = ctx.cluster.run(
41         args=[
42             'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43             'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44             'sudo', 'python',
45             '-c',
46             ('import shutil, sys; '
47              'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48             conf_path,
49             run.Raw('&&'),
50             'sudo', 'chmod', '0644', conf_path,
51         ],
52         stdin=run.PIPE,
53         wait=False)
54     teuthology.feed_many_stdins_and_close(conf_fp, writes)
55     run.wait(writes)
56
57
58 def mount_osd_data(ctx, remote, cluster, osd):
59     """
60     Mount a remote OSD
61
62     :param ctx: Context
63     :param remote: Remote site
64     :param cluster: name of ceph cluster
65     :param osd: Osd name
66     """
67     log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68     role = "{0}.osd.{1}".format(cluster, osd)
69     alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70     if remote in ctx.disk_config.remote_to_roles_to_dev:
71         if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72             role = alt_role
73         if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74             return
75         dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76         mount_options = ctx.disk_config.\
77             remote_to_roles_to_dev_mount_options[remote][role]
78         fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79         mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81         log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82                  'mountpoint: {p}, type: {t}, options: {v}'.format(
83                      o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84                      c=cluster))
85
86         remote.run(
87             args=[
88                 'sudo',
89                 'mount',
90                 '-t', fstype,
91                 '-o', ','.join(mount_options),
92                 dev,
93                 mnt,
94             ]
95             )
96
97
98 class Thrasher:
99     """
100     Object used to thrash Ceph
101     """
102     def __init__(self, manager, config, logger=None):
103         self.ceph_manager = manager
104         self.cluster = manager.cluster
105         self.ceph_manager.wait_for_clean()
106         osd_status = self.ceph_manager.get_osd_status()
107         self.in_osds = osd_status['in']
108         self.live_osds = osd_status['live']
109         self.out_osds = osd_status['out']
110         self.dead_osds = osd_status['dead']
111         self.stopping = False
112         self.logger = logger
113         self.config = config
114         self.revive_timeout = self.config.get("revive_timeout", 360)
115         self.pools_to_fix_pgp_num = set()
116         if self.config.get('powercycle'):
117             self.revive_timeout += 120
118         self.clean_wait = self.config.get('clean_wait', 0)
119         self.minin = self.config.get("min_in", 4)
120         self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121         self.sighup_delay = self.config.get('sighup_delay')
122         self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123         self.dump_ops_enable = self.config.get('dump_ops_enable')
124         self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125         self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126         self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127         self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128         self.random_eio = self.config.get('random_eio')
129         self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
130
131         num_osds = self.in_osds + self.out_osds
132         self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133         if self.logger is not None:
134             self.log = lambda x: self.logger.info(x)
135         else:
136             def tmp(x):
137                 """
138                 Implement log behavior
139                 """
140                 print x
141             self.log = tmp
142         if self.config is None:
143             self.config = dict()
144         # prevent monitor from auto-marking things out while thrasher runs
145         # try both old and new tell syntax, in case we are testing old code
146         self.saved_options = []
147         # assuming that the default settings do not vary from one daemon to
148         # another
149         first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150         opts = [('mon', 'mon_osd_down_out_interval', 0)]
151         for service, opt, new_value in opts:
152             old_value = manager.get_config(first_mon[0],
153                                            first_mon[1],
154                                            opt)
155             self.saved_options.append((service, opt, old_value))
156             self._set_config(service, '*', opt, new_value)
157         # initialize ceph_objectstore_tool property - must be done before
158         # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159         if (self.config.get('powercycle') or
160             not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161             self.config.get('disable_objectstore_tool_tests', False)):
162             self.ceph_objectstore_tool = False
163             self.test_rm_past_intervals = False
164             if self.config.get('powercycle'):
165                 self.log("Unable to test ceph-objectstore-tool, "
166                          "powercycle testing")
167             else:
168                 self.log("Unable to test ceph-objectstore-tool, "
169                          "not available on all OSD nodes")
170         else:
171             self.ceph_objectstore_tool = \
172                 self.config.get('ceph_objectstore_tool', True)
173             self.test_rm_past_intervals = \
174                 self.config.get('test_rm_past_intervals', True)
175         # spawn do_thrash
176         self.thread = gevent.spawn(self.do_thrash)
177         if self.sighup_delay:
178             self.sighup_thread = gevent.spawn(self.do_sighup)
179         if self.optrack_toggle_delay:
180             self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181         if self.dump_ops_enable == "true":
182             self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183         if self.noscrub_toggle_delay:
184             self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
185
186     def _set_config(self, service_type, service_id, name, value):
187         opt_arg = '--{name} {value}'.format(name=name, value=value)
188         whom = '.'.join([service_type, service_id])
189         self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190                                           'injectargs', opt_arg)
191
192
193     def cmd_exists_on_osds(self, cmd):
194         allremotes = self.ceph_manager.ctx.cluster.only(\
195             teuthology.is_type('osd', self.cluster)).remotes.keys()
196         allremotes = list(set(allremotes))
197         for remote in allremotes:
198             proc = remote.run(args=['type', cmd], wait=True,
199                               check_status=False, stdout=StringIO(),
200                               stderr=StringIO())
201             if proc.exitstatus != 0:
202                 return False;
203         return True;
204
205     def kill_osd(self, osd=None, mark_down=False, mark_out=False):
206         """
207         :param osd: Osd to be killed.
208         :mark_down: Mark down if true.
209         :mark_out: Mark out if true.
210         """
211         if osd is None:
212             osd = random.choice(self.live_osds)
213         self.log("Killing osd %s, live_osds are %s" % (str(osd),
214                                                        str(self.live_osds)))
215         self.live_osds.remove(osd)
216         self.dead_osds.append(osd)
217         self.ceph_manager.kill_osd(osd)
218         if mark_down:
219             self.ceph_manager.mark_down_osd(osd)
220         if mark_out and osd in self.in_osds:
221             self.out_osd(osd)
222         if self.ceph_objectstore_tool:
223             self.log("Testing ceph-objectstore-tool on down osd")
224             remote = self.ceph_manager.find_remote('osd', osd)
225             FSPATH = self.ceph_manager.get_filepath()
226             JPATH = os.path.join(FSPATH, "journal")
227             exp_osd = imp_osd = osd
228             exp_remote = imp_remote = remote
229             # If an older osd is available we'll move a pg from there
230             if (len(self.dead_osds) > 1 and
231                     random.random() < self.chance_move_pg):
232                 exp_osd = random.choice(self.dead_osds[:-1])
233                 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234             if ('keyvaluestore_backend' in
235                     self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236                 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237                           "--data-path {fpath} --journal-path {jpath} "
238                           "--type keyvaluestore "
239                           "--log-file="
240                           "/var/log/ceph/objectstore_tool.\\$pid.log ".
241                           format(fpath=FSPATH, jpath=JPATH))
242             else:
243                 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244                           "--data-path {fpath} --journal-path {jpath} "
245                           "--log-file="
246                           "/var/log/ceph/objectstore_tool.\\$pid.log ".
247                           format(fpath=FSPATH, jpath=JPATH))
248             cmd = (prefix + "--op list-pgs").format(id=exp_osd)
249
250             # ceph-objectstore-tool might be temporarily absent during an 
251             # upgrade - see http://tracker.ceph.com/issues/18014
252             with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
253                 while proceed():
254                     proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'], 
255                                wait=True, check_status=False, stdout=StringIO(),
256                                stderr=StringIO())
257                     if proc.exitstatus == 0:
258                         break
259                     log.debug("ceph-objectstore-tool binary not present, trying again")
260
261             # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262             # see http://tracker.ceph.com/issues/19556
263             with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
264                 while proceed():
265                     proc = exp_remote.run(args=cmd, wait=True,
266                                           check_status=False,
267                                           stdout=StringIO(), stderr=StringIO())
268                     if proc.exitstatus == 0:
269                         break
270                     elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
271                         continue
272                     else:
273                         raise Exception("ceph-objectstore-tool: "
274                                         "exp list-pgs failure with status {ret}".
275                                         format(ret=proc.exitstatus))
276
277             pgs = proc.stdout.getvalue().split('\n')[:-1]
278             if len(pgs) == 0:
279                 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
280                 return
281             pg = random.choice(pgs)
282             exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283             exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284             exp_path = os.path.join(exp_path,
285                                     "exp.{pg}.{id}".format(
286                                         pg=pg,
287                                         id=exp_osd))
288             # export
289             # Can't use new export-remove op since this is part of upgrade testing
290             cmd = prefix + "--op export --pgid {pg} --file {file}"
291             cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
292             proc = exp_remote.run(args=cmd)
293             if proc.exitstatus:
294                 raise Exception("ceph-objectstore-tool: "
295                                 "export failure with status {ret}".
296                                 format(ret=proc.exitstatus))
297             # remove
298             cmd = prefix + "--force --op remove --pgid {pg}"
299             cmd = cmd.format(id=exp_osd, pg=pg)
300             proc = exp_remote.run(args=cmd)
301             if proc.exitstatus:
302                 raise Exception("ceph-objectstore-tool: "
303                                 "remove failure with status {ret}".
304                                 format(ret=proc.exitstatus))
305             # If there are at least 2 dead osds we might move the pg
306             if exp_osd != imp_osd:
307                 # If pg isn't already on this osd, then we will move it there
308                 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
309                 proc = imp_remote.run(args=cmd, wait=True,
310                                       check_status=False, stdout=StringIO())
311                 if proc.exitstatus:
312                     raise Exception("ceph-objectstore-tool: "
313                                     "imp list-pgs failure with status {ret}".
314                                     format(ret=proc.exitstatus))
315                 pgs = proc.stdout.getvalue().split('\n')[:-1]
316                 if pg not in pgs:
317                     self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
318                              format(pg=pg, fosd=exp_osd, tosd=imp_osd))
319                     if imp_remote != exp_remote:
320                         # Copy export file to the other machine
321                         self.log("Transfer export file from {srem} to {trem}".
322                                  format(srem=exp_remote, trem=imp_remote))
323                         tmpexport = Remote.get_file(exp_remote, exp_path)
324                         Remote.put_file(imp_remote, tmpexport, exp_path)
325                         os.remove(tmpexport)
326                 else:
327                     # Can't move the pg after all
328                     imp_osd = exp_osd
329                     imp_remote = exp_remote
330             # import
331             cmd = (prefix + "--op import --file {file}")
332             cmd = cmd.format(id=imp_osd, file=exp_path)
333             proc = imp_remote.run(args=cmd, wait=True, check_status=False,
334                                   stderr=StringIO())
335             if proc.exitstatus == 1:
336                 bogosity = "The OSD you are using is older than the exported PG"
337                 if bogosity in proc.stderr.getvalue():
338                     self.log("OSD older than exported PG"
339                              "...ignored")
340             elif proc.exitstatus == 10:
341                 self.log("Pool went away before processing an import"
342                          "...ignored")
343             elif proc.exitstatus == 11:
344                 self.log("Attempt to import an incompatible export"
345                          "...ignored")
346             elif proc.exitstatus:
347                 raise Exception("ceph-objectstore-tool: "
348                                 "import failure with status {ret}".
349                                 format(ret=proc.exitstatus))
350             cmd = "rm -f {file}".format(file=exp_path)
351             exp_remote.run(args=cmd)
352             if imp_remote != exp_remote:
353                 imp_remote.run(args=cmd)
354
355             # apply low split settings to each pool
356             for pool in self.ceph_manager.list_pools():
357                 no_sudo_prefix = prefix[5:]
358                 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
359                        "--filestore-split-multiple 1' sudo -E "
360                        + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
361                 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
362                 output = proc.stderr.getvalue()
363                 if 'Couldn\'t find pool' in output:
364                     continue
365                 if proc.exitstatus:
366                     raise Exception("ceph-objectstore-tool apply-layout-settings"
367                                     " failed with {status}".format(status=proc.exitstatus))
368
369     def rm_past_intervals(self, osd=None):
370         """
371         :param osd: Osd to find pg to remove past intervals
372         """
373         if self.test_rm_past_intervals:
374             if osd is None:
375                 osd = random.choice(self.dead_osds)
376             self.log("Use ceph_objectstore_tool to remove past intervals")
377             remote = self.ceph_manager.find_remote('osd', osd)
378             FSPATH = self.ceph_manager.get_filepath()
379             JPATH = os.path.join(FSPATH, "journal")
380             if ('keyvaluestore_backend' in
381                     self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
382                 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
383                           "--data-path {fpath} --journal-path {jpath} "
384                           "--type keyvaluestore "
385                           "--log-file="
386                           "/var/log/ceph/objectstore_tool.\\$pid.log ".
387                           format(fpath=FSPATH, jpath=JPATH))
388             else:
389                 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
390                           "--data-path {fpath} --journal-path {jpath} "
391                           "--log-file="
392                           "/var/log/ceph/objectstore_tool.\\$pid.log ".
393                           format(fpath=FSPATH, jpath=JPATH))
394             cmd = (prefix + "--op list-pgs").format(id=osd)
395             proc = remote.run(args=cmd, wait=True,
396                               check_status=False, stdout=StringIO())
397             if proc.exitstatus:
398                 raise Exception("ceph_objectstore_tool: "
399                                 "exp list-pgs failure with status {ret}".
400                                 format(ret=proc.exitstatus))
401             pgs = proc.stdout.getvalue().split('\n')[:-1]
402             if len(pgs) == 0:
403                 self.log("No PGs found for osd.{osd}".format(osd=osd))
404                 return
405             pg = random.choice(pgs)
406             cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
407                 format(id=osd, pg=pg)
408             proc = remote.run(args=cmd)
409             if proc.exitstatus:
410                 raise Exception("ceph_objectstore_tool: "
411                                 "rm-past-intervals failure with status {ret}".
412                                 format(ret=proc.exitstatus))
413
414     def blackhole_kill_osd(self, osd=None):
415         """
416         If all else fails, kill the osd.
417         :param osd: Osd to be killed.
418         """
419         if osd is None:
420             osd = random.choice(self.live_osds)
421         self.log("Blackholing and then killing osd %s, live_osds are %s" %
422                  (str(osd), str(self.live_osds)))
423         self.live_osds.remove(osd)
424         self.dead_osds.append(osd)
425         self.ceph_manager.blackhole_kill_osd(osd)
426
427     def revive_osd(self, osd=None, skip_admin_check=False):
428         """
429         Revive the osd.
430         :param osd: Osd to be revived.
431         """
432         if osd is None:
433             osd = random.choice(self.dead_osds)
434         self.log("Reviving osd %s" % (str(osd),))
435         self.ceph_manager.revive_osd(
436             osd,
437             self.revive_timeout,
438             skip_admin_check=skip_admin_check)
439         self.dead_osds.remove(osd)
440         self.live_osds.append(osd)
441         if self.random_eio > 0 and osd is self.rerrosd:
442             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
443                           'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
444             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
445                           'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
446
447
448     def out_osd(self, osd=None):
449         """
450         Mark the osd out
451         :param osd: Osd to be marked.
452         """
453         if osd is None:
454             osd = random.choice(self.in_osds)
455         self.log("Removing osd %s, in_osds are: %s" %
456                  (str(osd), str(self.in_osds)))
457         self.ceph_manager.mark_out_osd(osd)
458         self.in_osds.remove(osd)
459         self.out_osds.append(osd)
460
461     def in_osd(self, osd=None):
462         """
463         Mark the osd out
464         :param osd: Osd to be marked.
465         """
466         if osd is None:
467             osd = random.choice(self.out_osds)
468         if osd in self.dead_osds:
469             return self.revive_osd(osd)
470         self.log("Adding osd %s" % (str(osd),))
471         self.out_osds.remove(osd)
472         self.in_osds.append(osd)
473         self.ceph_manager.mark_in_osd(osd)
474         self.log("Added osd %s" % (str(osd),))
475
476     def reweight_osd_or_by_util(self, osd=None):
477         """
478         Reweight an osd that is in
479         :param osd: Osd to be marked.
480         """
481         if osd is not None or random.choice([True, False]):
482             if osd is None:
483                 osd = random.choice(self.in_osds)
484             val = random.uniform(.1, 1.0)
485             self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
486             self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
487                                               str(osd), str(val))
488         else:
489             # do it several times, the option space is large
490             for i in range(5):
491                 options = {
492                     'max_change': random.choice(['0.05', '1.0', '3.0']),
493                     'overage': random.choice(['110', '1000']),
494                     'type': random.choice([
495                         'reweight-by-utilization',
496                         'test-reweight-by-utilization']),
497                 }
498                 self.log("Reweighting by: %s"%(str(options),))
499                 self.ceph_manager.raw_cluster_cmd(
500                     'osd',
501                     options['type'],
502                     options['overage'],
503                     options['max_change'])
504
505     def primary_affinity(self, osd=None):
506         if osd is None:
507             osd = random.choice(self.in_osds)
508         if random.random() >= .5:
509             pa = random.random()
510         elif random.random() >= .5:
511             pa = 1
512         else:
513             pa = 0
514         self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
515         self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
516                                           str(osd), str(pa))
517
518     def thrash_cluster_full(self):
519         """
520         Set and unset cluster full condition
521         """
522         self.log('Setting full ratio to .001')
523         self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
524         time.sleep(1)
525         self.log('Setting full ratio back to .95')
526         self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
527
528     def thrash_pg_upmap(self):
529         """
530         Install or remove random pg_upmap entries in OSDMap
531         """
532         from random import shuffle
533         out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
534         j = json.loads(out)
535         self.log('j is %s' % j)
536         try:
537             if random.random() >= .3:
538                 pgs = self.ceph_manager.get_pg_stats()
539                 pg = random.choice(pgs)
540                 pgid = str(pg['pgid'])
541                 poolid = int(pgid.split('.')[0])
542                 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
543                 if len(sizes) == 0:
544                     return
545                 n = sizes[0]
546                 osds = self.in_osds + self.out_osds
547                 shuffle(osds)
548                 osds = osds[0:n]
549                 self.log('Setting %s to %s' % (pgid, osds))
550                 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
551                 self.log('cmd %s' % cmd)
552                 self.ceph_manager.raw_cluster_cmd(*cmd)
553             else:
554                 m = j['pg_upmap']
555                 if len(m) > 0:
556                     shuffle(m)
557                     pg = m[0]['pgid']
558                     self.log('Clearing pg_upmap on %s' % pg)
559                     self.ceph_manager.raw_cluster_cmd(
560                         'osd',
561                         'rm-pg-upmap',
562                         pg)
563                 else:
564                     self.log('No pg_upmap entries; doing nothing')
565         except CommandFailedError:
566             self.log('Failed to rm-pg-upmap, ignoring')
567
568     def thrash_pg_upmap_items(self):
569         """
570         Install or remove random pg_upmap_items entries in OSDMap
571         """
572         from random import shuffle
573         out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
574         j = json.loads(out)
575         self.log('j is %s' % j)
576         try:
577             if random.random() >= .3:
578                 pgs = self.ceph_manager.get_pg_stats()
579                 pg = random.choice(pgs)
580                 pgid = str(pg['pgid'])
581                 poolid = int(pgid.split('.')[0])
582                 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
583                 if len(sizes) == 0:
584                     return
585                 n = sizes[0]
586                 osds = self.in_osds + self.out_osds
587                 shuffle(osds)
588                 osds = osds[0:n*2]
589                 self.log('Setting %s to %s' % (pgid, osds))
590                 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
591                 self.log('cmd %s' % cmd)
592                 self.ceph_manager.raw_cluster_cmd(*cmd)
593             else:
594                 m = j['pg_upmap_items']
595                 if len(m) > 0:
596                     shuffle(m)
597                     pg = m[0]['pgid']
598                     self.log('Clearing pg_upmap on %s' % pg)
599                     self.ceph_manager.raw_cluster_cmd(
600                         'osd',
601                         'rm-pg-upmap-items',
602                         pg)
603                 else:
604                     self.log('No pg_upmap entries; doing nothing')
605         except CommandFailedError:
606             self.log('Failed to rm-pg-upmap-items, ignoring')
607
608     def force_recovery(self):
609         """
610         Force recovery on some of PGs
611         """
612         backfill = random.random() >= 0.5
613         j = self.ceph_manager.get_pgids_to_force(backfill)
614         if j:
615             if backfill:
616                 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
617             else:
618                 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
619
620     def cancel_force_recovery(self):
621         """
622         Force recovery on some of PGs
623         """
624         backfill = random.random() >= 0.5
625         j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
626         if j:
627             if backfill:
628                 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
629             else:
630                 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
631
632     def force_cancel_recovery(self):
633         """
634         Force or cancel forcing recovery
635         """
636         if random.random() >= 0.4:
637            self.force_recovery()
638         else:
639            self.cancel_force_recovery()
640
641     def all_up(self):
642         """
643         Make sure all osds are up and not out.
644         """
645         while len(self.dead_osds) > 0:
646             self.log("reviving osd")
647             self.revive_osd()
648         while len(self.out_osds) > 0:
649             self.log("inning osd")
650             self.in_osd()
651
652     def all_up_in(self):
653         """
654         Make sure all osds are up and fully in.
655         """
656         self.all_up();
657         for osd in self.live_osds:
658             self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
659                                               str(osd), str(1))
660             self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
661                                               str(osd), str(1))
662
663     def do_join(self):
664         """
665         Break out of this Ceph loop
666         """
667         self.stopping = True
668         self.thread.get()
669         if self.sighup_delay:
670             self.log("joining the do_sighup greenlet")
671             self.sighup_thread.get()
672         if self.optrack_toggle_delay:
673             self.log("joining the do_optrack_toggle greenlet")
674             self.optrack_toggle_thread.join()
675         if self.dump_ops_enable == "true":
676             self.log("joining the do_dump_ops greenlet")
677             self.dump_ops_thread.join()
678         if self.noscrub_toggle_delay:
679             self.log("joining the do_noscrub_toggle greenlet")
680             self.noscrub_toggle_thread.join()
681
682     def grow_pool(self):
683         """
684         Increase the size of the pool
685         """
686         pool = self.ceph_manager.get_pool()
687         orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
688         self.log("Growing pool %s" % (pool,))
689         if self.ceph_manager.expand_pool(pool,
690                                          self.config.get('pool_grow_by', 10),
691                                          self.max_pgs):
692             self.pools_to_fix_pgp_num.add(pool)
693
694     def fix_pgp_num(self, pool=None):
695         """
696         Fix number of pgs in pool.
697         """
698         if pool is None:
699             pool = self.ceph_manager.get_pool()
700             force = False
701         else:
702             force = True
703         self.log("fixing pg num pool %s" % (pool,))
704         if self.ceph_manager.set_pool_pgpnum(pool, force):
705             self.pools_to_fix_pgp_num.discard(pool)
706
707     def test_pool_min_size(self):
708         """
709         Kill and revive all osds except one.
710         """
711         self.log("test_pool_min_size")
712         self.all_up()
713         self.ceph_manager.wait_for_recovery(
714             timeout=self.config.get('timeout')
715             )
716         the_one = random.choice(self.in_osds)
717         self.log("Killing everyone but %s", the_one)
718         to_kill = filter(lambda x: x != the_one, self.in_osds)
719         [self.kill_osd(i) for i in to_kill]
720         [self.out_osd(i) for i in to_kill]
721         time.sleep(self.config.get("test_pool_min_size_time", 10))
722         self.log("Killing %s" % (the_one,))
723         self.kill_osd(the_one)
724         self.out_osd(the_one)
725         self.log("Reviving everyone but %s" % (the_one,))
726         [self.revive_osd(i) for i in to_kill]
727         [self.in_osd(i) for i in to_kill]
728         self.log("Revived everyone but %s" % (the_one,))
729         self.log("Waiting for clean")
730         self.ceph_manager.wait_for_recovery(
731             timeout=self.config.get('timeout')
732             )
733
734     def inject_pause(self, conf_key, duration, check_after, should_be_down):
735         """
736         Pause injection testing. Check for osd being down when finished.
737         """
738         the_one = random.choice(self.live_osds)
739         self.log("inject_pause on {osd}".format(osd=the_one))
740         self.log(
741             "Testing {key} pause injection for duration {duration}".format(
742                 key=conf_key,
743                 duration=duration
744                 ))
745         self.log(
746             "Checking after {after}, should_be_down={shouldbedown}".format(
747                 after=check_after,
748                 shouldbedown=should_be_down
749                 ))
750         self.ceph_manager.set_config(the_one, **{conf_key: duration})
751         if not should_be_down:
752             return
753         time.sleep(check_after)
754         status = self.ceph_manager.get_osd_status()
755         assert the_one in status['down']
756         time.sleep(duration - check_after + 20)
757         status = self.ceph_manager.get_osd_status()
758         assert not the_one in status['down']
759
760     def test_backfill_full(self):
761         """
762         Test backfills stopping when the replica fills up.
763
764         First, use injectfull admin command to simulate a now full
765         osd by setting it to 0 on all of the OSDs.
766
767         Second, on a random subset, set
768         osd_debug_skip_full_check_in_backfill_reservation to force
769         the more complicated check in do_scan to be exercised.
770
771         Then, verify that all backfillings stop.
772         """
773         self.log("injecting backfill full")
774         for i in self.live_osds:
775             self.ceph_manager.set_config(
776                 i,
777                 osd_debug_skip_full_check_in_backfill_reservation=
778                 random.choice(['false', 'true']))
779             self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
780                                      check_status=True, timeout=30, stdout=DEVNULL)
781         for i in range(30):
782             status = self.ceph_manager.compile_pg_status()
783             if 'backfilling' not in status.keys():
784                 break
785             self.log(
786                 "waiting for {still_going} backfillings".format(
787                     still_going=status.get('backfilling')))
788             time.sleep(1)
789         assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
790         for i in self.live_osds:
791             self.ceph_manager.set_config(
792                 i,
793                 osd_debug_skip_full_check_in_backfill_reservation='false')
794             self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
795                                      check_status=True, timeout=30, stdout=DEVNULL)
796
797     def test_map_discontinuity(self):
798         """
799         1) Allows the osds to recover
800         2) kills an osd
801         3) allows the remaining osds to recover
802         4) waits for some time
803         5) revives the osd
804         This sequence should cause the revived osd to have to handle
805         a map gap since the mons would have trimmed
806         """
807         while len(self.in_osds) < (self.minin + 1):
808             self.in_osd()
809         self.log("Waiting for recovery")
810         self.ceph_manager.wait_for_all_osds_up(
811             timeout=self.config.get('timeout')
812             )
813         # now we wait 20s for the pg status to change, if it takes longer,
814         # the test *should* fail!
815         time.sleep(20)
816         self.ceph_manager.wait_for_clean(
817             timeout=self.config.get('timeout')
818             )
819
820         # now we wait 20s for the backfill replicas to hear about the clean
821         time.sleep(20)
822         self.log("Recovered, killing an osd")
823         self.kill_osd(mark_down=True, mark_out=True)
824         self.log("Waiting for clean again")
825         self.ceph_manager.wait_for_clean(
826             timeout=self.config.get('timeout')
827             )
828         self.log("Waiting for trim")
829         time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
830         self.revive_osd()
831
832     def choose_action(self):
833         """
834         Random action selector.
835         """
836         chance_down = self.config.get('chance_down', 0.4)
837         chance_test_min_size = self.config.get('chance_test_min_size', 0)
838         chance_test_backfill_full = \
839             self.config.get('chance_test_backfill_full', 0)
840         if isinstance(chance_down, int):
841             chance_down = float(chance_down) / 100
842         minin = self.minin
843         minout = self.config.get("min_out", 0)
844         minlive = self.config.get("min_live", 2)
845         mindead = self.config.get("min_dead", 0)
846
847         self.log('choose_action: min_in %d min_out '
848                  '%d min_live %d min_dead %d' %
849                  (minin, minout, minlive, mindead))
850         actions = []
851         if len(self.in_osds) > minin:
852             actions.append((self.out_osd, 1.0,))
853         if len(self.live_osds) > minlive and chance_down > 0:
854             actions.append((self.kill_osd, chance_down,))
855         if len(self.dead_osds) > 1:
856             actions.append((self.rm_past_intervals, 1.0,))
857         if len(self.out_osds) > minout:
858             actions.append((self.in_osd, 1.7,))
859         if len(self.dead_osds) > mindead:
860             actions.append((self.revive_osd, 1.0,))
861         if self.config.get('thrash_primary_affinity', True):
862             actions.append((self.primary_affinity, 1.0,))
863         actions.append((self.reweight_osd_or_by_util,
864                         self.config.get('reweight_osd', .5),))
865         actions.append((self.grow_pool,
866                         self.config.get('chance_pgnum_grow', 0),))
867         actions.append((self.fix_pgp_num,
868                         self.config.get('chance_pgpnum_fix', 0),))
869         actions.append((self.test_pool_min_size,
870                         chance_test_min_size,))
871         actions.append((self.test_backfill_full,
872                         chance_test_backfill_full,))
873         if self.chance_thrash_cluster_full > 0:
874             actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
875         if self.chance_thrash_pg_upmap > 0:
876             actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
877         if self.chance_thrash_pg_upmap_items > 0:
878             actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
879         if self.chance_force_recovery > 0:
880             actions.append((self.force_cancel_recovery, self.chance_force_recovery))
881
882         for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
883             for scenario in [
884                 (lambda:
885                  self.inject_pause(key,
886                                    self.config.get('pause_short', 3),
887                                    0,
888                                    False),
889                  self.config.get('chance_inject_pause_short', 1),),
890                 (lambda:
891                  self.inject_pause(key,
892                                    self.config.get('pause_long', 80),
893                                    self.config.get('pause_check_after', 70),
894                                    True),
895                  self.config.get('chance_inject_pause_long', 0),)]:
896                 actions.append(scenario)
897
898         total = sum([y for (x, y) in actions])
899         val = random.uniform(0, total)
900         for (action, prob) in actions:
901             if val < prob:
902                 return action
903             val -= prob
904         return None
905
906     def log_exc(func):
907         @wraps(func)
908         def wrapper(self):
909             try:
910                 return func(self)
911             except:
912                 self.log(traceback.format_exc())
913                 raise
914         return wrapper
915
916     @log_exc
917     def do_sighup(self):
918         """
919         Loops and sends signal.SIGHUP to a random live osd.
920
921         Loop delay is controlled by the config value sighup_delay.
922         """
923         delay = float(self.sighup_delay)
924         self.log("starting do_sighup with a delay of {0}".format(delay))
925         while not self.stopping:
926             osd = random.choice(self.live_osds)
927             self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
928             time.sleep(delay)
929
930     @log_exc
931     def do_optrack_toggle(self):
932         """
933         Loops and toggle op tracking to all osds.
934
935         Loop delay is controlled by the config value optrack_toggle_delay.
936         """
937         delay = float(self.optrack_toggle_delay)
938         osd_state = "true"
939         self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
940         while not self.stopping:
941             if osd_state == "true":
942                 osd_state = "false"
943             else:
944                 osd_state = "true"
945             self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
946                              'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
947             gevent.sleep(delay)
948
949     @log_exc
950     def do_dump_ops(self):
951         """
952         Loops and does op dumps on all osds
953         """
954         self.log("starting do_dump_ops")
955         while not self.stopping:
956             for osd in self.live_osds:
957                 # Ignore errors because live_osds is in flux
958                 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
959                                      check_status=False, timeout=30, stdout=DEVNULL)
960                 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
961                                      check_status=False, timeout=30, stdout=DEVNULL)
962                 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
963                                      check_status=False, timeout=30, stdout=DEVNULL)
964             gevent.sleep(0)
965
966     @log_exc
967     def do_noscrub_toggle(self):
968         """
969         Loops and toggle noscrub flags
970
971         Loop delay is controlled by the config value noscrub_toggle_delay.
972         """
973         delay = float(self.noscrub_toggle_delay)
974         scrub_state = "none"
975         self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
976         while not self.stopping:
977             if scrub_state == "none":
978                 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
979                 scrub_state = "noscrub"
980             elif scrub_state == "noscrub":
981                 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
982                 scrub_state = "both"
983             elif scrub_state == "both":
984                 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
985                 scrub_state = "nodeep-scrub"
986             else:
987                 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
988                 scrub_state = "none"
989             gevent.sleep(delay)
990         self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
991         self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
992
993     @log_exc
994     def do_thrash(self):
995         """
996         Loop to select random actions to thrash ceph manager with.
997         """
998         cleanint = self.config.get("clean_interval", 60)
999         scrubint = self.config.get("scrub_interval", -1)
1000         maxdead = self.config.get("max_dead", 0)
1001         delay = self.config.get("op_delay", 5)
1002         self.rerrosd = self.live_osds[0]
1003         if self.random_eio > 0:
1004             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1005                           'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1006             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1007                           'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
1008         self.log("starting do_thrash")
1009         while not self.stopping:
1010             to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1011                                        "out_osds: ", self.out_osds,
1012                                        "dead_osds: ", self.dead_osds,
1013                                        "live_osds: ", self.live_osds]]
1014             self.log(" ".join(to_log))
1015             if random.uniform(0, 1) < (float(delay) / cleanint):
1016                 while len(self.dead_osds) > maxdead:
1017                     self.revive_osd()
1018                 for osd in self.in_osds:
1019                     self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1020                                                       str(osd), str(1))
1021                 if random.uniform(0, 1) < float(
1022                         self.config.get('chance_test_map_discontinuity', 0)):
1023                     self.test_map_discontinuity()
1024                 else:
1025                     self.ceph_manager.wait_for_recovery(
1026                         timeout=self.config.get('timeout')
1027                         )
1028                 time.sleep(self.clean_wait)
1029                 if scrubint > 0:
1030                     if random.uniform(0, 1) < (float(delay) / scrubint):
1031                         self.log('Scrubbing while thrashing being performed')
1032                         Scrubber(self.ceph_manager, self.config)
1033             self.choose_action()()
1034             time.sleep(delay)
1035         self.all_up()
1036         if self.random_eio > 0:
1037             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1038                           'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1039             self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1040                           'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1041         for pool in list(self.pools_to_fix_pgp_num):
1042             if self.ceph_manager.get_pool_pg_num(pool) > 0:
1043                 self.fix_pgp_num(pool)
1044         self.pools_to_fix_pgp_num.clear()
1045         for service, opt, saved_value in self.saved_options:
1046             self._set_config(service, '*', opt, saved_value)
1047         self.saved_options = []
1048         self.all_up_in()
1049
1050
1051 class ObjectStoreTool:
1052
1053     def __init__(self, manager, pool, **kwargs):
1054         self.manager = manager
1055         self.pool = pool
1056         self.osd = kwargs.get('osd', None)
1057         self.object_name = kwargs.get('object_name', None)
1058         self.do_revive = kwargs.get('do_revive', True)
1059         if self.osd and self.pool and self.object_name:
1060             if self.osd == "primary":
1061                 self.osd = self.manager.get_object_primary(self.pool,
1062                                                            self.object_name)
1063         assert self.osd
1064         if self.object_name:
1065             self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1066                                                               self.object_name,
1067                                                               self.osd)
1068         self.remote = self.manager.ctx.\
1069             cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1070         path = self.manager.get_filepath().format(id=self.osd)
1071         self.paths = ("--data-path {path} --journal-path {path}/journal".
1072                       format(path=path))
1073
1074     def build_cmd(self, options, args, stdin):
1075         lines = []
1076         if self.object_name:
1077             lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1078                          "{paths} --pgid {pgid} --op list |"
1079                          "grep '\"oid\":\"{name}\"')".
1080                          format(paths=self.paths,
1081                                 pgid=self.pgid,
1082                                 name=self.object_name))
1083             args = '"$object" ' + args
1084             options += " --pgid {pgid}".format(pgid=self.pgid)
1085         cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1086                format(paths=self.paths,
1087                       args=args,
1088                       options=options))
1089         if stdin:
1090             cmd = ("echo {payload} | base64 --decode | {cmd}".
1091                    format(payload=base64.encode(stdin),
1092                           cmd=cmd))
1093         lines.append(cmd)
1094         return "\n".join(lines)
1095
1096     def run(self, options, args, stdin=None, stdout=None):
1097         if stdout is None:
1098             stdout = StringIO()
1099         self.manager.kill_osd(self.osd)
1100         cmd = self.build_cmd(options, args, stdin)
1101         self.manager.log(cmd)
1102         try:
1103             proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1104                                    check_status=False,
1105                                    stdout=stdout,
1106                                    stderr=StringIO())
1107             proc.wait()
1108             if proc.exitstatus != 0:
1109                 self.manager.log("failed with " + str(proc.exitstatus))
1110                 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1111                 raise Exception(error)
1112         finally:
1113             if self.do_revive:
1114                 self.manager.revive_osd(self.osd)
1115                 self.manager.wait_till_osd_is_up(self.osd, 300)
1116
1117
1118 class CephManager:
1119     """
1120     Ceph manager object.
1121     Contains several local functions that form a bulk of this module.
1122
1123     Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1124     the same name.
1125     """
1126
1127     REPLICATED_POOL = 1
1128     ERASURE_CODED_POOL = 3
1129
1130     def __init__(self, controller, ctx=None, config=None, logger=None,
1131                  cluster='ceph'):
1132         self.lock = threading.RLock()
1133         self.ctx = ctx
1134         self.config = config
1135         self.controller = controller
1136         self.next_pool_id = 0
1137         self.cluster = cluster
1138         if (logger):
1139             self.log = lambda x: logger.info(x)
1140         else:
1141             def tmp(x):
1142                 """
1143                 implement log behavior.
1144                 """
1145                 print x
1146             self.log = tmp
1147         if self.config is None:
1148             self.config = dict()
1149         pools = self.list_pools()
1150         self.pools = {}
1151         for pool in pools:
1152             # we may race with a pool deletion; ignore failures here
1153             try:
1154                 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1155             except CommandFailedError:
1156                 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1157
1158     def raw_cluster_cmd(self, *args):
1159         """
1160         Start ceph on a raw cluster.  Return count
1161         """
1162         testdir = teuthology.get_testdir(self.ctx)
1163         ceph_args = [
1164             'sudo',
1165             'adjust-ulimits',
1166             'ceph-coverage',
1167             '{tdir}/archive/coverage'.format(tdir=testdir),
1168             'timeout',
1169             '120',
1170             'ceph',
1171             '--cluster',
1172             self.cluster,
1173         ]
1174         ceph_args.extend(args)
1175         proc = self.controller.run(
1176             args=ceph_args,
1177             stdout=StringIO(),
1178             )
1179         return proc.stdout.getvalue()
1180
1181     def raw_cluster_cmd_result(self, *args):
1182         """
1183         Start ceph on a cluster.  Return success or failure information.
1184         """
1185         testdir = teuthology.get_testdir(self.ctx)
1186         ceph_args = [
1187             'sudo',
1188             'adjust-ulimits',
1189             'ceph-coverage',
1190             '{tdir}/archive/coverage'.format(tdir=testdir),
1191             'timeout',
1192             '120',
1193             'ceph',
1194             '--cluster',
1195             self.cluster,
1196         ]
1197         ceph_args.extend(args)
1198         proc = self.controller.run(
1199             args=ceph_args,
1200             check_status=False,
1201             )
1202         return proc.exitstatus
1203
1204     def run_ceph_w(self):
1205         """
1206         Execute "ceph -w" in the background with stdout connected to a StringIO,
1207         and return the RemoteProcess.
1208         """
1209         return self.controller.run(
1210             args=["sudo",
1211                   "daemon-helper",
1212                   "kill",
1213                   "ceph",
1214                   '--cluster',
1215                   self.cluster,
1216                   "-w"],
1217             wait=False, stdout=StringIO(), stdin=run.PIPE)
1218
1219     def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1220         """
1221         Flush pg stats from a list of OSD ids, ensuring they are reflected
1222         all the way to the monitor.  Luminous and later only.
1223
1224         :param osds: list of OSDs to flush
1225         :param no_wait: list of OSDs not to wait for seq id. by default, we
1226                         wait for all specified osds, but some of them could be
1227                         moved out of osdmap, so we cannot get their updated
1228                         stat seq from monitor anymore. in that case, you need
1229                         to pass a blacklist.
1230         :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1231                              it. (5 min by default)
1232         """
1233         seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1234                for osd in osds}
1235         if not wait_for_mon:
1236             return
1237         if no_wait is None:
1238             no_wait = []
1239         for osd, need in seq.iteritems():
1240             if osd in no_wait:
1241                 continue
1242             got = 0
1243             while wait_for_mon > 0:
1244                 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1245                 self.log('need seq {need} got {got} for osd.{osd}'.format(
1246                     need=need, got=got, osd=osd))
1247                 if got >= need:
1248                     break
1249                 A_WHILE = 1
1250                 time.sleep(A_WHILE)
1251                 wait_for_mon -= A_WHILE
1252             else:
1253                 raise Exception('timed out waiting for mon to be updated with '
1254                                 'osd.{osd}: {got} < {need}'.
1255                                 format(osd=osd, got=got, need=need))
1256
1257     def flush_all_pg_stats(self):
1258         self.flush_pg_stats(range(len(self.get_osd_dump())))
1259
1260     def do_rados(self, remote, cmd, check_status=True):
1261         """
1262         Execute a remote rados command.
1263         """
1264         testdir = teuthology.get_testdir(self.ctx)
1265         pre = [
1266             'adjust-ulimits',
1267             'ceph-coverage',
1268             '{tdir}/archive/coverage'.format(tdir=testdir),
1269             'rados',
1270             '--cluster',
1271             self.cluster,
1272             ]
1273         pre.extend(cmd)
1274         proc = remote.run(
1275             args=pre,
1276             wait=True,
1277             check_status=check_status
1278             )
1279         return proc
1280
1281     def rados_write_objects(self, pool, num_objects, size,
1282                             timelimit, threads, cleanup=False):
1283         """
1284         Write rados objects
1285         Threads not used yet.
1286         """
1287         args = [
1288             '-p', pool,
1289             '--num-objects', num_objects,
1290             '-b', size,
1291             'bench', timelimit,
1292             'write'
1293             ]
1294         if not cleanup:
1295             args.append('--no-cleanup')
1296         return self.do_rados(self.controller, map(str, args))
1297
1298     def do_put(self, pool, obj, fname, namespace=None):
1299         """
1300         Implement rados put operation
1301         """
1302         args = ['-p', pool]
1303         if namespace is not None:
1304             args += ['-N', namespace]
1305         args += [
1306             'put',
1307             obj,
1308             fname
1309         ]
1310         return self.do_rados(
1311             self.controller,
1312             args,
1313             check_status=False
1314         ).exitstatus
1315
1316     def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1317         """
1318         Implement rados get operation
1319         """
1320         args = ['-p', pool]
1321         if namespace is not None:
1322             args += ['-N', namespace]
1323         args += [
1324             'get',
1325             obj,
1326             fname
1327         ]
1328         return self.do_rados(
1329             self.controller,
1330             args,
1331             check_status=False
1332         ).exitstatus
1333
1334     def do_rm(self, pool, obj, namespace=None):
1335         """
1336         Implement rados rm operation
1337         """
1338         args = ['-p', pool]
1339         if namespace is not None:
1340             args += ['-N', namespace]
1341         args += [
1342             'rm',
1343             obj
1344         ]
1345         return self.do_rados(
1346             self.controller,
1347             args,
1348             check_status=False
1349         ).exitstatus
1350
1351     def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1352         if stdout is None:
1353             stdout = StringIO()
1354         return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1355
1356     def find_remote(self, service_type, service_id):
1357         """
1358         Get the Remote for the host where a particular service runs.
1359
1360         :param service_type: 'mds', 'osd', 'client'
1361         :param service_id: The second part of a role, e.g. '0' for
1362                            the role 'client.0'
1363         :return: a Remote instance for the host where the
1364                  requested role is placed
1365         """
1366         return get_remote(self.ctx, self.cluster,
1367                           service_type, service_id)
1368
1369     def admin_socket(self, service_type, service_id,
1370                      command, check_status=True, timeout=0, stdout=None):
1371         """
1372         Remotely start up ceph specifying the admin socket
1373         :param command: a list of words to use as the command
1374                         to the admin socket
1375         """
1376         if stdout is None:
1377             stdout = StringIO()
1378         testdir = teuthology.get_testdir(self.ctx)
1379         remote = self.find_remote(service_type, service_id)
1380         args = [
1381             'sudo',
1382             'adjust-ulimits',
1383             'ceph-coverage',
1384             '{tdir}/archive/coverage'.format(tdir=testdir),
1385             'timeout',
1386             str(timeout),
1387             'ceph',
1388             '--cluster',
1389             self.cluster,
1390             '--admin-daemon',
1391             '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1392                 cluster=self.cluster,
1393                 type=service_type,
1394                 id=service_id),
1395             ]
1396         args.extend(command)
1397         return remote.run(
1398             args=args,
1399             stdout=stdout,
1400             wait=True,
1401             check_status=check_status
1402             )
1403
1404     def objectstore_tool(self, pool, options, args, **kwargs):
1405         return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1406
1407     def get_pgid(self, pool, pgnum):
1408         """
1409         :param pool: pool name
1410         :param pgnum: pg number
1411         :returns: a string representing this pg.
1412         """
1413         poolnum = self.get_pool_num(pool)
1414         pg_str = "{poolnum}.{pgnum}".format(
1415             poolnum=poolnum,
1416             pgnum=pgnum)
1417         return pg_str
1418
1419     def get_pg_replica(self, pool, pgnum):
1420         """
1421         get replica for pool, pgnum (e.g. (data, 0)->0
1422         """
1423         pg_str = self.get_pgid(pool, pgnum)
1424         output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1425         j = json.loads('\n'.join(output.split('\n')[1:]))
1426         return int(j['acting'][-1])
1427         assert False
1428
1429     def wait_for_pg_stats(func):
1430         # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1431         # by default, and take the faulty injection in ms into consideration,
1432         # 12 seconds are more than enough
1433         delays = [1, 1, 2, 3, 5, 8, 13]
1434         @wraps(func)
1435         def wrapper(self, *args, **kwargs):
1436             exc = None
1437             for delay in delays:
1438                 try:
1439                     return func(self, *args, **kwargs)
1440                 except AssertionError as e:
1441                     time.sleep(delay)
1442                     exc = e
1443             raise exc
1444         return wrapper
1445
1446     def get_pg_primary(self, pool, pgnum):
1447         """
1448         get primary for pool, pgnum (e.g. (data, 0)->0
1449         """
1450         pg_str = self.get_pgid(pool, pgnum)
1451         output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1452         j = json.loads('\n'.join(output.split('\n')[1:]))
1453         return int(j['acting'][0])
1454         assert False
1455
1456     def get_pool_num(self, pool):
1457         """
1458         get number for pool (e.g., data -> 2)
1459         """
1460         return int(self.get_pool_dump(pool)['pool'])
1461
1462     def list_pools(self):
1463         """
1464         list all pool names
1465         """
1466         osd_dump = self.get_osd_dump_json()
1467         self.log(osd_dump['pools'])
1468         return [str(i['pool_name']) for i in osd_dump['pools']]
1469
1470     def clear_pools(self):
1471         """
1472         remove all pools
1473         """
1474         [self.remove_pool(i) for i in self.list_pools()]
1475
1476     def kick_recovery_wq(self, osdnum):
1477         """
1478         Run kick_recovery_wq on cluster.
1479         """
1480         return self.raw_cluster_cmd(
1481             'tell', "osd.%d" % (int(osdnum),),
1482             'debug',
1483             'kick_recovery_wq',
1484             '0')
1485
1486     def wait_run_admin_socket(self, service_type,
1487                               service_id, args=['version'], timeout=75, stdout=None):
1488         """
1489         If osd_admin_socket call suceeds, return.  Otherwise wait
1490         five seconds and try again.
1491         """
1492         if stdout is None:
1493             stdout = StringIO()
1494         tries = 0
1495         while True:
1496             proc = self.admin_socket(service_type, service_id,
1497                                      args, check_status=False, stdout=stdout)
1498             if proc.exitstatus is 0:
1499                 return proc
1500             else:
1501                 tries += 1
1502                 if (tries * 5) > timeout:
1503                     raise Exception('timed out waiting for admin_socket '
1504                                     'to appear after {type}.{id} restart'.
1505                                     format(type=service_type,
1506                                            id=service_id))
1507                 self.log("waiting on admin_socket for {type}-{id}, "
1508                          "{command}".format(type=service_type,
1509                                             id=service_id,
1510                                             command=args))
1511                 time.sleep(5)
1512
1513     def get_pool_dump(self, pool):
1514         """
1515         get the osd dump part of a pool
1516         """
1517         osd_dump = self.get_osd_dump_json()
1518         for i in osd_dump['pools']:
1519             if i['pool_name'] == pool:
1520                 return i
1521         assert False
1522
1523     def get_config(self, service_type, service_id, name):
1524         """
1525         :param node: like 'mon.a'
1526         :param name: the option name
1527         """
1528         proc = self.wait_run_admin_socket(service_type, service_id,
1529                                           ['config', 'show'])
1530         j = json.loads(proc.stdout.getvalue())
1531         return j[name]
1532
1533     def set_config(self, osdnum, **argdict):
1534         """
1535         :param osdnum: osd number
1536         :param argdict: dictionary containing values to set.
1537         """
1538         for k, v in argdict.iteritems():
1539             self.wait_run_admin_socket(
1540                 'osd', osdnum,
1541                 ['config', 'set', str(k), str(v)])
1542
1543     def raw_cluster_status(self):
1544         """
1545         Get status from cluster
1546         """
1547         status = self.raw_cluster_cmd('status', '--format=json-pretty')
1548         return json.loads(status)
1549
1550     def raw_osd_status(self):
1551         """
1552         Get osd status from cluster
1553         """
1554         return self.raw_cluster_cmd('osd', 'dump')
1555
1556     def get_osd_status(self):
1557         """
1558         Get osd statuses sorted by states that the osds are in.
1559         """
1560         osd_lines = filter(
1561             lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1562             self.raw_osd_status().split('\n'))
1563         self.log(osd_lines)
1564         in_osds = [int(i[4:].split()[0])
1565                    for i in filter(lambda x: " in " in x, osd_lines)]
1566         out_osds = [int(i[4:].split()[0])
1567                     for i in filter(lambda x: " out " in x, osd_lines)]
1568         up_osds = [int(i[4:].split()[0])
1569                    for i in filter(lambda x: " up " in x, osd_lines)]
1570         down_osds = [int(i[4:].split()[0])
1571                      for i in filter(lambda x: " down " in x, osd_lines)]
1572         dead_osds = [int(x.id_)
1573                      for x in filter(lambda x:
1574                                      not x.running(),
1575                                      self.ctx.daemons.
1576                                      iter_daemons_of_role('osd', self.cluster))]
1577         live_osds = [int(x.id_) for x in
1578                      filter(lambda x:
1579                             x.running(),
1580                             self.ctx.daemons.iter_daemons_of_role('osd',
1581                                                                   self.cluster))]
1582         return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1583                 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1584                 'raw': osd_lines}
1585
1586     def get_num_pgs(self):
1587         """
1588         Check cluster status for the number of pgs
1589         """
1590         status = self.raw_cluster_status()
1591         self.log(status)
1592         return status['pgmap']['num_pgs']
1593
1594     def create_erasure_code_profile(self, profile_name, profile):
1595         """
1596         Create an erasure code profile name that can be used as a parameter
1597         when creating an erasure coded pool.
1598         """
1599         with self.lock:
1600             args = cmd_erasure_code_profile(profile_name, profile)
1601             self.raw_cluster_cmd(*args)
1602
1603     def create_pool_with_unique_name(self, pg_num=16,
1604                                      erasure_code_profile_name=None,
1605                                      min_size=None,
1606                                      erasure_code_use_overwrites=False):
1607         """
1608         Create a pool named unique_pool_X where X is unique.
1609         """
1610         name = ""
1611         with self.lock:
1612             name = "unique_pool_%s" % (str(self.next_pool_id),)
1613             self.next_pool_id += 1
1614             self.create_pool(
1615                 name,
1616                 pg_num,
1617                 erasure_code_profile_name=erasure_code_profile_name,
1618                 min_size=min_size,
1619                 erasure_code_use_overwrites=erasure_code_use_overwrites)
1620         return name
1621
1622     @contextlib.contextmanager
1623     def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1624         self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1625         yield
1626         self.remove_pool(pool_name)
1627
1628     def create_pool(self, pool_name, pg_num=16,
1629                     erasure_code_profile_name=None,
1630                     min_size=None,
1631                     erasure_code_use_overwrites=False):
1632         """
1633         Create a pool named from the pool_name parameter.
1634         :param pool_name: name of the pool being created.
1635         :param pg_num: initial number of pgs.
1636         :param erasure_code_profile_name: if set and !None create an
1637                                           erasure coded pool using the profile
1638         :param erasure_code_use_overwrites: if true, allow overwrites
1639         """
1640         with self.lock:
1641             assert isinstance(pool_name, basestring)
1642             assert isinstance(pg_num, int)
1643             assert pool_name not in self.pools
1644             self.log("creating pool_name %s" % (pool_name,))
1645             if erasure_code_profile_name:
1646                 self.raw_cluster_cmd('osd', 'pool', 'create',
1647                                      pool_name, str(pg_num), str(pg_num),
1648                                      'erasure', erasure_code_profile_name)
1649             else:
1650                 self.raw_cluster_cmd('osd', 'pool', 'create',
1651                                      pool_name, str(pg_num))
1652             if min_size is not None:
1653                 self.raw_cluster_cmd(
1654                     'osd', 'pool', 'set', pool_name,
1655                     'min_size',
1656                     str(min_size))
1657             if erasure_code_use_overwrites:
1658                 self.raw_cluster_cmd(
1659                     'osd', 'pool', 'set', pool_name,
1660                     'allow_ec_overwrites',
1661                     'true')
1662             self.raw_cluster_cmd(
1663                 'osd', 'pool', 'application', 'enable',
1664                 pool_name, 'rados', '--yes-i-really-mean-it',
1665                 run.Raw('||'), 'true')
1666             self.pools[pool_name] = pg_num
1667         time.sleep(1)
1668
1669     def add_pool_snap(self, pool_name, snap_name):
1670         """
1671         Add pool snapshot
1672         :param pool_name: name of pool to snapshot
1673         :param snap_name: name of snapshot to take
1674         """
1675         self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1676                              str(pool_name), str(snap_name))
1677
1678     def remove_pool_snap(self, pool_name, snap_name):
1679         """
1680         Remove pool snapshot
1681         :param pool_name: name of pool to snapshot
1682         :param snap_name: name of snapshot to remove
1683         """
1684         self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1685                              str(pool_name), str(snap_name))
1686
1687     def remove_pool(self, pool_name):
1688         """
1689         Remove the indicated pool
1690         :param pool_name: Pool to be removed
1691         """
1692         with self.lock:
1693             assert isinstance(pool_name, basestring)
1694             assert pool_name in self.pools
1695             self.log("removing pool_name %s" % (pool_name,))
1696             del self.pools[pool_name]
1697             self.do_rados(self.controller,
1698                           ['rmpool', pool_name, pool_name,
1699                            "--yes-i-really-really-mean-it"])
1700
1701     def get_pool(self):
1702         """
1703         Pick a random pool
1704         """
1705         with self.lock:
1706             return random.choice(self.pools.keys())
1707
1708     def get_pool_pg_num(self, pool_name):
1709         """
1710         Return the number of pgs in the pool specified.
1711         """
1712         with self.lock:
1713             assert isinstance(pool_name, basestring)
1714             if pool_name in self.pools:
1715                 return self.pools[pool_name]
1716             return 0
1717
1718     def get_pool_property(self, pool_name, prop):
1719         """
1720         :param pool_name: pool
1721         :param prop: property to be checked.
1722         :returns: property as an int value.
1723         """
1724         with self.lock:
1725             assert isinstance(pool_name, basestring)
1726             assert isinstance(prop, basestring)
1727             output = self.raw_cluster_cmd(
1728                 'osd',
1729                 'pool',
1730                 'get',
1731                 pool_name,
1732                 prop)
1733             return int(output.split()[1])
1734
1735     def set_pool_property(self, pool_name, prop, val):
1736         """
1737         :param pool_name: pool
1738         :param prop: property to be set.
1739         :param val: value to set.
1740
1741         This routine retries if set operation fails.
1742         """
1743         with self.lock:
1744             assert isinstance(pool_name, basestring)
1745             assert isinstance(prop, basestring)
1746             assert isinstance(val, int)
1747             tries = 0
1748             while True:
1749                 r = self.raw_cluster_cmd_result(
1750                     'osd',
1751                     'pool',
1752                     'set',
1753                     pool_name,
1754                     prop,
1755                     str(val))
1756                 if r != 11:  # EAGAIN
1757                     break
1758                 tries += 1
1759                 if tries > 50:
1760                     raise Exception('timed out getting EAGAIN '
1761                                     'when setting pool property %s %s = %s' %
1762                                     (pool_name, prop, val))
1763                 self.log('got EAGAIN setting pool property, '
1764                          'waiting a few seconds...')
1765                 time.sleep(2)
1766
1767     def expand_pool(self, pool_name, by, max_pgs):
1768         """
1769         Increase the number of pgs in a pool
1770         """
1771         with self.lock:
1772             assert isinstance(pool_name, basestring)
1773             assert isinstance(by, int)
1774             assert pool_name in self.pools
1775             if self.get_num_creating() > 0:
1776                 return False
1777             if (self.pools[pool_name] + by) > max_pgs:
1778                 return False
1779             self.log("increase pool size by %d" % (by,))
1780             new_pg_num = self.pools[pool_name] + by
1781             self.set_pool_property(pool_name, "pg_num", new_pg_num)
1782             self.pools[pool_name] = new_pg_num
1783             return True
1784
1785     def set_pool_pgpnum(self, pool_name, force):
1786         """
1787         Set pgpnum property of pool_name pool.
1788         """
1789         with self.lock:
1790             assert isinstance(pool_name, basestring)
1791             assert pool_name in self.pools
1792             if not force and self.get_num_creating() > 0:
1793                 return False
1794             self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1795             return True
1796
1797     def list_pg_missing(self, pgid):
1798         """
1799         return list of missing pgs with the id specified
1800         """
1801         r = None
1802         offset = {}
1803         while True:
1804             out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1805                                        json.dumps(offset))
1806             j = json.loads(out)
1807             if r is None:
1808                 r = j
1809             else:
1810                 r['objects'].extend(j['objects'])
1811             if not 'more' in j:
1812                 break
1813             if j['more'] == 0:
1814                 break
1815             offset = j['objects'][-1]['oid']
1816         if 'more' in r:
1817             del r['more']
1818         return r
1819
1820     def get_pg_stats(self):
1821         """
1822         Dump the cluster and get pg stats
1823         """
1824         out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1825         j = json.loads('\n'.join(out.split('\n')[1:]))
1826         return j['pg_stats']
1827
1828     def get_pgids_to_force(self, backfill):
1829         """
1830         Return the randomized list of PGs that can have their recovery/backfill forced
1831         """
1832         j = self.get_pg_stats();
1833         pgids = []
1834         if backfill:
1835             wanted = ['degraded', 'backfilling', 'backfill_wait']
1836         else:
1837             wanted = ['recovering', 'degraded', 'recovery_wait']
1838         for pg in j:
1839             status = pg['state'].split('+')
1840             for t in wanted:
1841                 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1842                     pgids.append(pg['pgid'])
1843                     break
1844         return pgids
1845
1846     def get_pgids_to_cancel_force(self, backfill):
1847        """
1848        Return the randomized list of PGs whose recovery/backfill priority is forced
1849        """
1850        j = self.get_pg_stats();
1851        pgids = []
1852        if backfill:
1853            wanted = 'forced_backfill'
1854        else:
1855            wanted = 'forced_recovery'
1856        for pg in j:
1857            status = pg['state'].split('+')
1858            if wanted in status and random.random() > 0.5:
1859                pgids.append(pg['pgid'])
1860        return pgids
1861
1862     def compile_pg_status(self):
1863         """
1864         Return a histogram of pg state values
1865         """
1866         ret = {}
1867         j = self.get_pg_stats()
1868         for pg in j:
1869             for status in pg['state'].split('+'):
1870                 if status not in ret:
1871                     ret[status] = 0
1872                 ret[status] += 1
1873         return ret
1874
1875     @wait_for_pg_stats
1876     def with_pg_state(self, pool, pgnum, check):
1877         pgstr = self.get_pgid(pool, pgnum)
1878         stats = self.get_single_pg_stats(pgstr)
1879         assert(check(stats['state']))
1880
1881     @wait_for_pg_stats
1882     def with_pg(self, pool, pgnum, check):
1883         pgstr = self.get_pgid(pool, pgnum)
1884         stats = self.get_single_pg_stats(pgstr)
1885         return check(stats)
1886
1887     def get_last_scrub_stamp(self, pool, pgnum):
1888         """
1889         Get the timestamp of the last scrub.
1890         """
1891         stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1892         return stats["last_scrub_stamp"]
1893
1894     def do_pg_scrub(self, pool, pgnum, stype):
1895         """
1896         Scrub pg and wait for scrubbing to finish
1897         """
1898         init = self.get_last_scrub_stamp(pool, pgnum)
1899         RESEND_TIMEOUT = 120    # Must be a multiple of SLEEP_TIME
1900         FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1901         SLEEP_TIME = 10
1902         timer = 0
1903         while init == self.get_last_scrub_stamp(pool, pgnum):
1904             assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1905             self.log("waiting for scrub type %s" % (stype,))
1906             if (timer % RESEND_TIMEOUT) == 0:
1907                 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1908                 # The first time in this loop is the actual request
1909                 if timer != 0 and stype == "repair":
1910                     self.log("WARNING: Resubmitted a non-idempotent repair")
1911             time.sleep(SLEEP_TIME)
1912             timer += SLEEP_TIME
1913
1914     def wait_snap_trimming_complete(self, pool):
1915         """
1916         Wait for snap trimming on pool to end
1917         """
1918         POLL_PERIOD = 10
1919         FATAL_TIMEOUT = 600
1920         start = time.time()
1921         poolnum = self.get_pool_num(pool)
1922         poolnumstr = "%s." % (poolnum,)
1923         while (True):
1924             now = time.time()
1925             if (now - start) > FATAL_TIMEOUT:
1926                 assert (now - start) < FATAL_TIMEOUT, \
1927                     'failed to complete snap trimming before timeout'
1928             all_stats = self.get_pg_stats()
1929             trimming = False
1930             for pg in all_stats:
1931                 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1932                     self.log("pg {pg} in trimming, state: {state}".format(
1933                         pg=pg['pgid'],
1934                         state=pg['state']))
1935                     trimming = True
1936             if not trimming:
1937                 break
1938             self.log("{pool} still trimming, waiting".format(pool=pool))
1939             time.sleep(POLL_PERIOD)
1940
1941     def get_single_pg_stats(self, pgid):
1942         """
1943         Return pg for the pgid specified.
1944         """
1945         all_stats = self.get_pg_stats()
1946
1947         for pg in all_stats:
1948             if pg['pgid'] == pgid:
1949                 return pg
1950
1951         return None
1952
1953     def get_object_pg_with_shard(self, pool, name, osdid):
1954         """
1955         """
1956         pool_dump = self.get_pool_dump(pool)
1957         object_map = self.get_object_map(pool, name)
1958         if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1959             shard = object_map['acting'].index(osdid)
1960             return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1961                                            shard=shard)
1962         else:
1963             return object_map['pgid']
1964
1965     def get_object_primary(self, pool, name):
1966         """
1967         """
1968         object_map = self.get_object_map(pool, name)
1969         return object_map['acting_primary']
1970
1971     def get_object_map(self, pool, name):
1972         """
1973         osd map --format=json converted to a python object
1974         :returns: the python object
1975         """
1976         out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1977         return json.loads('\n'.join(out.split('\n')[1:]))
1978
1979     def get_osd_dump_json(self):
1980         """
1981         osd dump --format=json converted to a python object
1982         :returns: the python object
1983         """
1984         out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1985         return json.loads('\n'.join(out.split('\n')[1:]))
1986
1987     def get_osd_dump(self):
1988         """
1989         Dump osds
1990         :returns: all osds
1991         """
1992         return self.get_osd_dump_json()['osds']
1993
1994     def get_mgr_dump(self):
1995         out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
1996         return json.loads(out)
1997
1998     def get_stuck_pgs(self, type_, threshold):
1999         """
2000         :returns: stuck pg information from the cluster
2001         """
2002         out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2003                                    '--format=json')
2004         return json.loads(out)
2005
2006     def get_num_unfound_objects(self):
2007         """
2008         Check cluster status to get the number of unfound objects
2009         """
2010         status = self.raw_cluster_status()
2011         self.log(status)
2012         return status['pgmap'].get('unfound_objects', 0)
2013
2014     def get_num_creating(self):
2015         """
2016         Find the number of pgs in creating mode.
2017         """
2018         pgs = self.get_pg_stats()
2019         num = 0
2020         for pg in pgs:
2021             if 'creating' in pg['state']:
2022                 num += 1
2023         return num
2024
2025     def get_num_active_clean(self):
2026         """
2027         Find the number of active and clean pgs.
2028         """
2029         pgs = self.get_pg_stats()
2030         num = 0
2031         for pg in pgs:
2032             if (pg['state'].count('active') and
2033                     pg['state'].count('clean') and
2034                     not pg['state'].count('stale')):
2035                 num += 1
2036         return num
2037
2038     def get_num_active_recovered(self):
2039         """
2040         Find the number of active and recovered pgs.
2041         """
2042         pgs = self.get_pg_stats()
2043         num = 0
2044         for pg in pgs:
2045             if (pg['state'].count('active') and
2046                     not pg['state'].count('recover') and
2047                     not pg['state'].count('backfilling') and
2048                     not pg['state'].count('stale')):
2049                 num += 1
2050         return num
2051
2052     def get_is_making_recovery_progress(self):
2053         """
2054         Return whether there is recovery progress discernable in the
2055         raw cluster status
2056         """
2057         status = self.raw_cluster_status()
2058         kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2059         bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2060         ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2061         return kps > 0 or bps > 0 or ops > 0
2062
2063     def get_num_active(self):
2064         """
2065         Find the number of active pgs.
2066         """
2067         pgs = self.get_pg_stats()
2068         num = 0
2069         for pg in pgs:
2070             if pg['state'].count('active') and not pg['state'].count('stale'):
2071                 num += 1
2072         return num
2073
2074     def get_num_down(self):
2075         """
2076         Find the number of pgs that are down.
2077         """
2078         pgs = self.get_pg_stats()
2079         num = 0
2080         for pg in pgs:
2081             if ((pg['state'].count('down') and not
2082                     pg['state'].count('stale')) or
2083                 (pg['state'].count('incomplete') and not
2084                     pg['state'].count('stale'))):
2085                 num += 1
2086         return num
2087
2088     def get_num_active_down(self):
2089         """
2090         Find the number of pgs that are either active or down.
2091         """
2092         pgs = self.get_pg_stats()
2093         num = 0
2094         for pg in pgs:
2095             if ((pg['state'].count('active') and not
2096                     pg['state'].count('stale')) or
2097                 (pg['state'].count('down') and not
2098                     pg['state'].count('stale')) or
2099                 (pg['state'].count('incomplete') and not
2100                     pg['state'].count('stale'))):
2101                 num += 1
2102         return num
2103
2104     def is_clean(self):
2105         """
2106         True if all pgs are clean
2107         """
2108         return self.get_num_active_clean() == self.get_num_pgs()
2109
2110     def is_recovered(self):
2111         """
2112         True if all pgs have recovered
2113         """
2114         return self.get_num_active_recovered() == self.get_num_pgs()
2115
2116     def is_active_or_down(self):
2117         """
2118         True if all pgs are active or down
2119         """
2120         return self.get_num_active_down() == self.get_num_pgs()
2121
2122     def wait_for_clean(self, timeout=None):
2123         """
2124         Returns true when all pgs are clean.
2125         """
2126         self.log("waiting for clean")
2127         start = time.time()
2128         num_active_clean = self.get_num_active_clean()
2129         while not self.is_clean():
2130             if timeout is not None:
2131                 if self.get_is_making_recovery_progress():
2132                     self.log("making progress, resetting timeout")
2133                     start = time.time()
2134                 else:
2135                     self.log("no progress seen, keeping timeout for now")
2136                     if time.time() - start >= timeout:
2137                         self.log('dumping pgs')
2138                         out = self.raw_cluster_cmd('pg', 'dump')
2139                         self.log(out)
2140                         assert time.time() - start < timeout, \
2141                             'failed to become clean before timeout expired'
2142             cur_active_clean = self.get_num_active_clean()
2143             if cur_active_clean != num_active_clean:
2144                 start = time.time()
2145                 num_active_clean = cur_active_clean
2146             time.sleep(3)
2147         self.log("clean!")
2148
2149     def are_all_osds_up(self):
2150         """
2151         Returns true if all osds are up.
2152         """
2153         x = self.get_osd_dump()
2154         return (len(x) == sum([(y['up'] > 0) for y in x]))
2155
2156     def wait_for_all_osds_up(self, timeout=None):
2157         """
2158         When this exits, either the timeout has expired, or all
2159         osds are up.
2160         """
2161         self.log("waiting for all up")
2162         start = time.time()
2163         while not self.are_all_osds_up():
2164             if timeout is not None:
2165                 assert time.time() - start < timeout, \
2166                     'timeout expired in wait_for_all_osds_up'
2167             time.sleep(3)
2168         self.log("all up!")
2169
2170     def pool_exists(self, pool):
2171         if pool in self.list_pools():
2172             return True
2173         return False
2174
2175     def wait_for_pool(self, pool, timeout=300):
2176         """
2177         Wait for a pool to exist
2178         """
2179         self.log('waiting for pool %s to exist' % pool)
2180         start = time.time()
2181         while not self.pool_exists(pool):
2182             if timeout is not None:
2183                 assert time.time() - start < timeout, \
2184                     'timeout expired in wait_for_pool'
2185             time.sleep(3)
2186
2187     def wait_for_pools(self, pools):
2188         for pool in pools:
2189             self.wait_for_pool(pool)
2190
2191     def is_mgr_available(self):
2192         x = self.get_mgr_dump()
2193         return x.get('available', False)
2194
2195     def wait_for_mgr_available(self, timeout=None):
2196         self.log("waiting for mgr available")
2197         start = time.time()
2198         while not self.is_mgr_available():
2199             if timeout is not None:
2200                 assert time.time() - start < timeout, \
2201                     'timeout expired in wait_for_mgr_available'
2202             time.sleep(3)
2203         self.log("mgr available!")
2204
2205     def wait_for_recovery(self, timeout=None):
2206         """
2207         Check peering. When this exists, we have recovered.
2208         """
2209         self.log("waiting for recovery to complete")
2210         start = time.time()
2211         num_active_recovered = self.get_num_active_recovered()
2212         while not self.is_recovered():
2213             now = time.time()
2214             if timeout is not None:
2215                 if self.get_is_making_recovery_progress():
2216                     self.log("making progress, resetting timeout")
2217                     start = time.time()
2218                 else:
2219                     self.log("no progress seen, keeping timeout for now")
2220                     if now - start >= timeout:
2221                         if self.is_recovered():
2222                             break
2223                         self.log('dumping pgs')
2224                         out = self.raw_cluster_cmd('pg', 'dump')
2225                         self.log(out)
2226                         assert now - start < timeout, \
2227                             'failed to recover before timeout expired'
2228             cur_active_recovered = self.get_num_active_recovered()
2229             if cur_active_recovered != num_active_recovered:
2230                 start = time.time()
2231                 num_active_recovered = cur_active_recovered
2232             time.sleep(3)
2233         self.log("recovered!")
2234
2235     def wait_for_active(self, timeout=None):
2236         """
2237         Check peering. When this exists, we are definitely active
2238         """
2239         self.log("waiting for peering to complete")
2240         start = time.time()
2241         num_active = self.get_num_active()
2242         while not self.is_active():
2243             if timeout is not None:
2244                 if time.time() - start >= timeout:
2245                     self.log('dumping pgs')
2246                     out = self.raw_cluster_cmd('pg', 'dump')
2247                     self.log(out)
2248                     assert time.time() - start < timeout, \
2249                         'failed to recover before timeout expired'
2250             cur_active = self.get_num_active()
2251             if cur_active != num_active:
2252                 start = time.time()
2253                 num_active = cur_active
2254             time.sleep(3)
2255         self.log("active!")
2256
2257     def wait_for_active_or_down(self, timeout=None):
2258         """
2259         Check peering. When this exists, we are definitely either
2260         active or down
2261         """
2262         self.log("waiting for peering to complete or become blocked")
2263         start = time.time()
2264         num_active_down = self.get_num_active_down()
2265         while not self.is_active_or_down():
2266             if timeout is not None:
2267                 if time.time() - start >= timeout:
2268                     self.log('dumping pgs')
2269                     out = self.raw_cluster_cmd('pg', 'dump')
2270                     self.log(out)
2271                     assert time.time() - start < timeout, \
2272                         'failed to recover before timeout expired'
2273             cur_active_down = self.get_num_active_down()
2274             if cur_active_down != num_active_down:
2275                 start = time.time()
2276                 num_active_down = cur_active_down
2277             time.sleep(3)
2278         self.log("active or down!")
2279
2280     def osd_is_up(self, osd):
2281         """
2282         Wrapper for osd check
2283         """
2284         osds = self.get_osd_dump()
2285         return osds[osd]['up'] > 0
2286
2287     def wait_till_osd_is_up(self, osd, timeout=None):
2288         """
2289         Loop waiting for osd.
2290         """
2291         self.log('waiting for osd.%d to be up' % osd)
2292         start = time.time()
2293         while not self.osd_is_up(osd):
2294             if timeout is not None:
2295                 assert time.time() - start < timeout, \
2296                     'osd.%d failed to come up before timeout expired' % osd
2297             time.sleep(3)
2298         self.log('osd.%d is up' % osd)
2299
2300     def is_active(self):
2301         """
2302         Wrapper to check if all pgs are active
2303         """
2304         return self.get_num_active() == self.get_num_pgs()
2305
2306     def wait_till_active(self, timeout=None):
2307         """
2308         Wait until all pgs are active.
2309         """
2310         self.log("waiting till active")
2311         start = time.time()
2312         while not self.is_active():
2313             if timeout is not None:
2314                 if time.time() - start >= timeout:
2315                     self.log('dumping pgs')
2316                     out = self.raw_cluster_cmd('pg', 'dump')
2317                     self.log(out)
2318                     assert time.time() - start < timeout, \
2319                         'failed to become active before timeout expired'
2320             time.sleep(3)
2321         self.log("active!")
2322
2323     def wait_till_pg_convergence(self, timeout=None):
2324         start = time.time()
2325         old_stats = None
2326         active_osds = [osd['osd'] for osd in self.get_osd_dump()
2327                        if osd['in'] and osd['up']]
2328         while True:
2329             # strictly speaking, no need to wait for mon. but due to the
2330             # "ms inject socket failures" setting, the osdmap could be delayed,
2331             # so mgr is likely to ignore the pg-stat messages with pgs serving
2332             # newly created pools which is not yet known by mgr. so, to make sure
2333             # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2334             # necessary.
2335             self.flush_pg_stats(active_osds)
2336             new_stats = dict((stat['pgid'], stat['state'])
2337                              for stat in self.get_pg_stats())
2338             if old_stats == new_stats:
2339                 return old_stats
2340             if timeout is not None:
2341                 assert time.time() - start < timeout, \
2342                     'failed to reach convergence before %d secs' % timeout
2343             old_stats = new_stats
2344             # longer than mgr_stats_period
2345             time.sleep(5 + 1)
2346
2347     def mark_out_osd(self, osd):
2348         """
2349         Wrapper to mark osd out.
2350         """
2351         self.raw_cluster_cmd('osd', 'out', str(osd))
2352
2353     def kill_osd(self, osd):
2354         """
2355         Kill osds by either power cycling (if indicated by the config)
2356         or by stopping.
2357         """
2358         if self.config.get('powercycle'):
2359             remote = self.find_remote('osd', osd)
2360             self.log('kill_osd on osd.{o} '
2361                      'doing powercycle of {s}'.format(o=osd, s=remote.name))
2362             self._assert_ipmi(remote)
2363             remote.console.power_off()
2364         elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2365             if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2366                 self.raw_cluster_cmd(
2367                     '--', 'tell', 'osd.%d' % osd,
2368                     'injectargs',
2369                     '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2370                 )
2371                 try:
2372                     self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2373                 except:
2374                     pass
2375                 else:
2376                     raise RuntimeError('osd.%s did not fail' % osd)
2377             else:
2378                 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2379         else:
2380             self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2381
2382     @staticmethod
2383     def _assert_ipmi(remote):
2384         assert remote.console.has_ipmi_credentials, (
2385             "powercycling requested but RemoteConsole is not "
2386             "initialized.  Check ipmi config.")
2387
2388     def blackhole_kill_osd(self, osd):
2389         """
2390         Stop osd if nothing else works.
2391         """
2392         self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2393                              'injectargs',
2394                              '--objectstore-blackhole')
2395         time.sleep(2)
2396         self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2397
2398     def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2399         """
2400         Revive osds by either power cycling (if indicated by the config)
2401         or by restarting.
2402         """
2403         if self.config.get('powercycle'):
2404             remote = self.find_remote('osd', osd)
2405             self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2406                      format(o=osd, s=remote.name))
2407             self._assert_ipmi(remote)
2408             remote.console.power_on()
2409             if not remote.console.check_status(300):
2410                 raise Exception('Failed to revive osd.{o} via ipmi'.
2411                                 format(o=osd))
2412             teuthology.reconnect(self.ctx, 60, [remote])
2413             mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2414             self.make_admin_daemon_dir(remote)
2415             self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2416         self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2417
2418         if not skip_admin_check:
2419             # wait for dump_ops_in_flight; this command doesn't appear
2420             # until after the signal handler is installed and it is safe
2421             # to stop the osd again without making valgrind leak checks
2422             # unhappy.  see #5924.
2423             self.wait_run_admin_socket('osd', osd,
2424                                        args=['dump_ops_in_flight'],
2425                                        timeout=timeout, stdout=DEVNULL)
2426
2427     def mark_down_osd(self, osd):
2428         """
2429         Cluster command wrapper
2430         """
2431         self.raw_cluster_cmd('osd', 'down', str(osd))
2432
2433     def mark_in_osd(self, osd):
2434         """
2435         Cluster command wrapper
2436         """
2437         self.raw_cluster_cmd('osd', 'in', str(osd))
2438
2439     def signal_osd(self, osd, sig, silent=False):
2440         """
2441         Wrapper to local get_daemon call which sends the given
2442         signal to the given osd.
2443         """
2444         self.ctx.daemons.get_daemon('osd', osd,
2445                                     self.cluster).signal(sig, silent=silent)
2446
2447     ## monitors
2448     def signal_mon(self, mon, sig, silent=False):
2449         """
2450         Wrapper to local get_deamon call
2451         """
2452         self.ctx.daemons.get_daemon('mon', mon,
2453                                     self.cluster).signal(sig, silent=silent)
2454
2455     def kill_mon(self, mon):
2456         """
2457         Kill the monitor by either power cycling (if the config says so),
2458         or by doing a stop.
2459         """
2460         if self.config.get('powercycle'):
2461             remote = self.find_remote('mon', mon)
2462             self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2463                      format(m=mon, s=remote.name))
2464             self._assert_ipmi(remote)
2465             remote.console.power_off()
2466         else:
2467             self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2468
2469     def revive_mon(self, mon):
2470         """
2471         Restart by either power cycling (if the config says so),
2472         or by doing a normal restart.
2473         """
2474         if self.config.get('powercycle'):
2475             remote = self.find_remote('mon', mon)
2476             self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2477                      format(m=mon, s=remote.name))
2478             self._assert_ipmi(remote)
2479             remote.console.power_on()
2480             self.make_admin_daemon_dir(remote)
2481         self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2482
2483     def revive_mgr(self, mgr):
2484         """
2485         Restart by either power cycling (if the config says so),
2486         or by doing a normal restart.
2487         """
2488         if self.config.get('powercycle'):
2489             remote = self.find_remote('mgr', mgr)
2490             self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2491                      format(m=mgr, s=remote.name))
2492             self._assert_ipmi(remote)
2493             remote.console.power_on()
2494             self.make_admin_daemon_dir(remote)
2495         self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2496
2497     def get_mon_status(self, mon):
2498         """
2499         Extract all the monitor status information from the cluster
2500         """
2501         addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2502         out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2503         return json.loads(out)
2504
2505     def get_mon_quorum(self):
2506         """
2507         Extract monitor quorum information from the cluster
2508         """
2509         out = self.raw_cluster_cmd('quorum_status')
2510         j = json.loads(out)
2511         self.log('quorum_status is %s' % out)
2512         return j['quorum']
2513
2514     def wait_for_mon_quorum_size(self, size, timeout=300):
2515         """
2516         Loop until quorum size is reached.
2517         """
2518         self.log('waiting for quorum size %d' % size)
2519         start = time.time()
2520         while not len(self.get_mon_quorum()) == size:
2521             if timeout is not None:
2522                 assert time.time() - start < timeout, \
2523                     ('failed to reach quorum size %d '
2524                      'before timeout expired' % size)
2525             time.sleep(3)
2526         self.log("quorum is size %d" % size)
2527
2528     def get_mon_health(self, debug=False):
2529         """
2530         Extract all the monitor health information.
2531         """
2532         out = self.raw_cluster_cmd('health', '--format=json')
2533         if debug:
2534             self.log('health:\n{h}'.format(h=out))
2535         return json.loads(out)
2536
2537     def get_mds_status(self, mds):
2538         """
2539         Run cluster commands for the mds in order to get mds information
2540         """
2541         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2542         j = json.loads(' '.join(out.splitlines()[1:]))
2543         # collate; for dup ids, larger gid wins.
2544         for info in j['info'].itervalues():
2545             if info['name'] == mds:
2546                 return info
2547         return None
2548
2549     def get_filepath(self):
2550         """
2551         Return path to osd data with {id} needing to be replaced
2552         """
2553         return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2554
2555     def make_admin_daemon_dir(self, remote):
2556         """
2557         Create /var/run/ceph directory on remote site.
2558
2559         :param ctx: Context
2560         :param remote: Remote site
2561         """
2562         remote.run(args=['sudo',
2563                          'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2564
2565
2566 def utility_task(name):
2567     """
2568     Generate ceph_manager subtask corresponding to ceph_manager
2569     method name
2570     """
2571     def task(ctx, config):
2572         if config is None:
2573             config = {}
2574         args = config.get('args', [])
2575         kwargs = config.get('kwargs', {})
2576         cluster = config.get('cluster', 'ceph')
2577         fn = getattr(ctx.managers[cluster], name)
2578         fn(*args, **kwargs)
2579     return task
2580
2581 revive_osd = utility_task("revive_osd")
2582 revive_mon = utility_task("revive_mon")
2583 kill_osd = utility_task("kill_osd")
2584 kill_mon = utility_task("kill_mon")
2585 create_pool = utility_task("create_pool")
2586 remove_pool = utility_task("remove_pool")
2587 wait_for_clean = utility_task("wait_for_clean")
2588 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2589 set_pool_property = utility_task("set_pool_property")
2590 do_pg_scrub = utility_task("do_pg_scrub")
2591 wait_for_pool = utility_task("wait_for_pool")
2592 wait_for_pools = utility_task("wait_for_pools")