Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11 import random
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27
28 class ObjectNotFound(Exception):
29     def __init__(self, object_name):
30         self._object_name = object_name
31
32     def __str__(self):
33         return "Object not found: '{0}'".format(self._object_name)
34
35 class FSStatus(object):
36     """
37     Operations on a snapshot of the FSMap.
38     """
39     def __init__(self, mon_manager):
40         self.mon = mon_manager
41         self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43     def __str__(self):
44         return json.dumps(self.map, indent = 2, sort_keys = True)
45
46     # Expose the fsmap for manual inspection.
47     def __getitem__(self, key):
48         """
49         Get a field from the fsmap.
50         """
51         return self.map[key]
52
53     def get_filesystems(self):
54         """
55         Iterator for all filesystems.
56         """
57         for fs in self.map['filesystems']:
58             yield fs
59
60     def get_all(self):
61         """
62         Iterator for all the mds_info components in the FSMap.
63         """
64         for info in self.get_standbys():
65             yield info
66         for fs in self.map['filesystems']:
67             for info in fs['mdsmap']['info'].values():
68                 yield info
69
70     def get_standbys(self):
71         """
72         Iterator for all standbys.
73         """
74         for info in self.map['standbys']:
75             yield info
76
77     def get_fsmap(self, fscid):
78         """
79         Get the fsmap for the given FSCID.
80         """
81         for fs in self.map['filesystems']:
82             if fscid is None or fs['id'] == fscid:
83                 return fs
84         raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86     def get_fsmap_byname(self, name):
87         """
88         Get the fsmap for the given file system name.
89         """
90         for fs in self.map['filesystems']:
91             if name is None or fs['mdsmap']['fs_name'] == name:
92                 return fs
93         raise RuntimeError("FS {0} not in map".format(name))
94
95     def get_replays(self, fscid):
96         """
97         Get the standby:replay MDS for the given FSCID.
98         """
99         fs = self.get_fsmap(fscid)
100         for info in fs['mdsmap']['info'].values():
101             if info['state'] == 'up:standby-replay':
102                 yield info
103
104     def get_ranks(self, fscid):
105         """
106         Get the ranks for the given FSCID.
107         """
108         fs = self.get_fsmap(fscid)
109         for info in fs['mdsmap']['info'].values():
110             if info['rank'] >= 0:
111                 yield info
112
113     def get_rank(self, fscid, rank):
114         """
115         Get the rank for the given FSCID.
116         """
117         for info in self.get_ranks(fscid):
118             if info['rank'] == rank:
119                 return info
120         raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122     def get_mds(self, name):
123         """
124         Get the info for the given MDS name.
125         """
126         for info in self.get_all():
127             if info['name'] == name:
128                 return info
129         return None
130
131     def get_mds_addr(self, name):
132         """
133         Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134         """
135         info = self.get_mds(name)
136         if info:
137             return info['addr']
138         else:
139             log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
140             raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142 class CephCluster(object):
143     @property
144     def admin_remote(self):
145         first_mon = misc.get_first_mon(self._ctx, None)
146         (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147         return result
148
149     def __init__(self, ctx):
150         self._ctx = ctx
151         self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153     def get_config(self, key, service_type=None):
154         """
155         Get config from mon by default, or a specific service if caller asks for it
156         """
157         if service_type is None:
158             service_type = 'mon'
159
160         service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161         return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163     def set_ceph_conf(self, subsys, key, value):
164         if subsys not in self._ctx.ceph['ceph'].conf:
165             self._ctx.ceph['ceph'].conf[subsys] = {}
166         self._ctx.ceph['ceph'].conf[subsys][key] = value
167         write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
168                                # used a different config path this won't work.
169
170     def clear_ceph_conf(self, subsys, key):
171         del self._ctx.ceph['ceph'].conf[subsys][key]
172         write_conf(self._ctx)
173
174     def json_asok(self, command, service_type, service_id):
175         proc = self.mon_manager.admin_socket(service_type, service_id, command)
176         response_data = proc.stdout.getvalue()
177         log.info("_json_asok output: {0}".format(response_data))
178         if response_data.strip():
179             return json.loads(response_data)
180         else:
181             return None
182
183
184 class MDSCluster(CephCluster):
185     """
186     Collective operations on all the MDS daemons in the Ceph cluster.  These
187     daemons may be in use by various Filesystems.
188
189     For the benefit of pre-multi-filesystem tests, this class is also
190     a parent of Filesystem.  The correct way to use MDSCluster going forward is
191     as a separate instance outside of your (multiple) Filesystem instances.
192     """
193     def __init__(self, ctx):
194         super(MDSCluster, self).__init__(ctx)
195
196         self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
197
198         if len(self.mds_ids) == 0:
199             raise RuntimeError("This task requires at least one MDS")
200
201         if hasattr(self._ctx, "daemons"):
202             # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203             self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
204
205     def _one_or_all(self, mds_id, cb, in_parallel=True):
206         """
207         Call a callback for a single named MDS, or for all.
208
209         Note that the parallelism here isn't for performance, it's to avoid being overly kind
210         to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211         avoid being overly kind by executing them in a particular order.  However, some actions
212         don't cope with being done in parallel, so it's optional (`in_parallel`)
213
214         :param mds_id: MDS daemon name, or None
215         :param cb: Callback taking single argument of MDS daemon name
216         :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
217         """
218         if mds_id is None:
219             if in_parallel:
220                 with parallel() as p:
221                     for mds_id in self.mds_ids:
222                         p.spawn(cb, mds_id)
223             else:
224                 for mds_id in self.mds_ids:
225                     cb(mds_id)
226         else:
227             cb(mds_id)
228
229     def get_config(self, key, service_type=None):
230         """
231         get_config specialization of service_type="mds"
232         """
233         if service_type != "mds":
234             return super(MDSCluster, self).get_config(key, service_type)
235
236         # Some tests stop MDS daemons, don't send commands to a dead one:
237         service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
238         return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240     def mds_stop(self, mds_id=None):
241         """
242         Stop the MDS daemon process(se).  If it held a rank, that rank
243         will eventually go laggy.
244         """
245         self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
246
247     def mds_fail(self, mds_id=None):
248         """
249         Inform MDSMonitor of the death of the daemon process(es).  If it held
250         a rank, that rank will be relinquished.
251         """
252         self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
253
254     def mds_restart(self, mds_id=None):
255         self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
256
257     def mds_fail_restart(self, mds_id=None):
258         """
259         Variation on restart that includes marking MDSs as failed, so that doing this
260         operation followed by waiting for healthy daemon states guarantees that they
261         have gone down and come up, rather than potentially seeing the healthy states
262         that existed before the restart.
263         """
264         def _fail_restart(id_):
265             self.mds_daemons[id_].stop()
266             self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
267             self.mds_daemons[id_].restart()
268
269         self._one_or_all(mds_id, _fail_restart)
270
271     def newfs(self, name='cephfs', create=True):
272         return Filesystem(self._ctx, name=name, create=create)
273
274     def status(self):
275         return FSStatus(self.mon_manager)
276
277     def delete_all_filesystems(self):
278         """
279         Remove all filesystems that exist, and any pools in use by them.
280         """
281         pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
282         pool_id_name = {}
283         for pool in pools:
284             pool_id_name[pool['pool']] = pool['pool_name']
285
286         # mark cluster down for each fs to prevent churn during deletion
287         status = self.status()
288         for fs in status.get_filesystems():
289             self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
290
291         # get a new copy as actives may have since changed
292         status = self.status()
293         for fs in status.get_filesystems():
294             mdsmap = fs['mdsmap']
295             metadata_pool = pool_id_name[mdsmap['metadata_pool']]
296
297             for gid in mdsmap['up'].values():
298                 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
299
300             self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
301             self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
302                                              metadata_pool, metadata_pool,
303                                              '--yes-i-really-really-mean-it')
304             for data_pool in mdsmap['data_pools']:
305                 data_pool = pool_id_name[data_pool]
306                 try:
307                     self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
308                                                      data_pool, data_pool,
309                                                      '--yes-i-really-really-mean-it')
310                 except CommandFailedError as e:
311                     if e.exitstatus == 16: # EBUSY, this data pool is used
312                         pass               # by two metadata pools, let the 2nd
313                     else:                  # pass delete it
314                         raise
315
316     def get_standby_daemons(self):
317         return set([s['name'] for s in self.status().get_standbys()])
318
319     def get_mds_hostnames(self):
320         result = set()
321         for mds_id in self.mds_ids:
322             mds_remote = self.mon_manager.find_remote('mds', mds_id)
323             result.add(mds_remote.hostname)
324
325         return list(result)
326
327     def set_clients_block(self, blocked, mds_id=None):
328         """
329         Block (using iptables) client communications to this MDS.  Be careful: if
330         other services are running on this MDS, or other MDSs try to talk to this
331         MDS, their communications may also be blocked as collatoral damage.
332
333         :param mds_id: Optional ID of MDS to block, default to all
334         :return:
335         """
336         da_flag = "-A" if blocked else "-D"
337
338         def set_block(_mds_id):
339             remote = self.mon_manager.find_remote('mds', _mds_id)
340             status = self.status()
341
342             addr = status.get_mds_addr(_mds_id)
343             ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
344
345             remote.run(
346                 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
347                       "comment", "--comment", "teuthology"])
348             remote.run(
349                 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
350                       "comment", "--comment", "teuthology"])
351
352         self._one_or_all(mds_id, set_block, in_parallel=False)
353
354     def clear_firewall(self):
355         clear_firewall(self._ctx)
356
357     def get_mds_info(self, mds_id):
358         return FSStatus(self.mon_manager).get_mds(mds_id)
359
360     def is_full(self):
361         flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
362         return 'full' in flags
363
364     def is_pool_full(self, pool_name):
365         pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
366         for pool in pools:
367             if pool['pool_name'] == pool_name:
368                 return 'full' in pool['flags_names'].split(",")
369
370         raise RuntimeError("Pool not found '{0}'".format(pool_name))
371
372 class Filesystem(MDSCluster):
373     """
374     This object is for driving a CephFS filesystem.  The MDS daemons driven by
375     MDSCluster may be shared with other Filesystems.
376     """
377     def __init__(self, ctx, fscid=None, name=None, create=False,
378                  ec_profile=None):
379         super(Filesystem, self).__init__(ctx)
380
381         self.name = name
382         self.ec_profile = ec_profile
383         self.id = None
384         self.metadata_pool_name = None
385         self.metadata_overlay = False
386         self.data_pool_name = None
387         self.data_pools = None
388
389         client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
390         self.client_id = client_list[0]
391         self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
392
393         if name is not None:
394             if fscid is not None:
395                 raise RuntimeError("cannot specify fscid when creating fs")
396             if create and not self.legacy_configured():
397                 self.create()
398         else:
399             if fscid is not None:
400                 self.id = fscid
401                 self.getinfo(refresh = True)
402
403         # Stash a reference to the first created filesystem on ctx, so
404         # that if someone drops to the interactive shell they can easily
405         # poke our methods.
406         if not hasattr(self._ctx, "filesystem"):
407             self._ctx.filesystem = self
408
409     def getinfo(self, refresh = False):
410         status = self.status()
411         if self.id is not None:
412             fsmap = status.get_fsmap(self.id)
413         elif self.name is not None:
414             fsmap = status.get_fsmap_byname(self.name)
415         else:
416             fss = [fs for fs in status.get_filesystems()]
417             if len(fss) == 1:
418                 fsmap = fss[0]
419             elif len(fss) == 0:
420                 raise RuntimeError("no file system available")
421             else:
422                 raise RuntimeError("more than one file system available")
423         self.id = fsmap['id']
424         self.name = fsmap['mdsmap']['fs_name']
425         self.get_pool_names(status = status, refresh = refresh)
426         return status
427
428     def set_metadata_overlay(self, overlay):
429         if self.id is not None:
430             raise RuntimeError("cannot specify fscid when configuring overlay")
431         self.metadata_overlay = overlay
432
433     def deactivate(self, rank):
434         if rank < 0:
435             raise RuntimeError("invalid rank")
436         elif rank == 0:
437             raise RuntimeError("cannot deactivate rank 0")
438         self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
439
440     def set_max_mds(self, max_mds):
441         self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
442
443     def set_allow_dirfrags(self, yes):
444         self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
445
446     def get_pgs_per_fs_pool(self):
447         """
448         Calculate how many PGs to use when creating a pool, in order to avoid raising any
449         health warnings about mon_pg_warn_min_per_osd
450
451         :return: an integer number of PGs
452         """
453         pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
454         osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
455         return pg_warn_min_per_osd * osd_count
456
457     def create(self):
458         if self.name is None:
459             self.name = "cephfs"
460         if self.metadata_pool_name is None:
461             self.metadata_pool_name = "{0}_metadata".format(self.name)
462         if self.data_pool_name is None:
463             data_pool_name = "{0}_data".format(self.name)
464         else:
465             data_pool_name = self.data_pool_name
466
467         log.info("Creating filesystem '{0}'".format(self.name))
468
469         pgs_per_fs_pool = self.get_pgs_per_fs_pool()
470
471         self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
472                                          self.metadata_pool_name, pgs_per_fs_pool.__str__())
473         if self.metadata_overlay:
474             self.mon_manager.raw_cluster_cmd('fs', 'new',
475                                              self.name, self.metadata_pool_name, data_pool_name,
476                                              '--allow-dangerous-metadata-overlay')
477         else:
478             if self.ec_profile:
479                 log.info("EC profile is %s", self.ec_profile)
480                 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
481                 cmd.extend(self.ec_profile)
482                 self.mon_manager.raw_cluster_cmd(*cmd)
483                 self.mon_manager.raw_cluster_cmd(
484                     'osd', 'pool', 'create',
485                     data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
486                     data_pool_name)
487                 self.mon_manager.raw_cluster_cmd(
488                     'osd', 'pool', 'set',
489                     data_pool_name, 'allow_ec_overwrites', 'true')
490             else:
491                 self.mon_manager.raw_cluster_cmd(
492                     'osd', 'pool', 'create',
493                     data_pool_name, pgs_per_fs_pool.__str__())
494             self.mon_manager.raw_cluster_cmd('fs', 'new',
495                                              self.name, self.metadata_pool_name, data_pool_name)
496         self.check_pool_application(self.metadata_pool_name)
497         self.check_pool_application(data_pool_name)
498         # Turn off spurious standby count warnings from modifying max_mds in tests.
499         try:
500             self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
501         except CommandFailedError as e:
502             if e.exitstatus == 22:
503                 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
504                 pass
505             else:
506                 raise
507
508         self.getinfo(refresh = True)
509
510         
511     def check_pool_application(self, pool_name):
512         osd_map = self.mon_manager.get_osd_dump_json()
513         for pool in osd_map['pools']:
514             if pool['pool_name'] == pool_name:
515                 if "application_metadata" in pool:
516                     if not "cephfs" in pool['application_metadata']:
517                         raise RuntimeError("Pool %p does not name cephfs as application!".\
518                                            format(pool_name))
519         
520
521     def __del__(self):
522         if getattr(self._ctx, "filesystem", None) == self:
523             delattr(self._ctx, "filesystem")
524
525     def exists(self):
526         """
527         Whether a filesystem exists in the mon's filesystem list
528         """
529         fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
530         return self.name in [fs['name'] for fs in fs_list]
531
532     def legacy_configured(self):
533         """
534         Check if a legacy (i.e. pre "fs new") filesystem configuration is present.  If this is
535         the case, the caller should avoid using Filesystem.create
536         """
537         try:
538             out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
539             pools = json.loads(out_text)
540             metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
541             if metadata_pool_exists:
542                 self.metadata_pool_name = 'metadata'
543         except CommandFailedError as e:
544             # For use in upgrade tests, Ceph cuttlefish and earlier don't support
545             # structured output (--format) from the CLI.
546             if e.exitstatus == 22:
547                 metadata_pool_exists = True
548             else:
549                 raise
550
551         return metadata_pool_exists
552
553     def _df(self):
554         return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
555
556     def get_mds_map(self):
557         return self.status().get_fsmap(self.id)['mdsmap']
558
559     def add_data_pool(self, name):
560         self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
561         self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
562         self.get_pool_names(refresh = True)
563         for poolid, fs_name in self.data_pools.items():
564             if name == fs_name:
565                 return poolid
566         raise RuntimeError("could not get just created pool '{0}'".format(name))
567
568     def get_pool_names(self, refresh = False, status = None):
569         if refresh or self.metadata_pool_name is None or self.data_pools is None:
570             if status is None:
571                 status = self.status()
572             fsmap = status.get_fsmap(self.id)
573
574             osd_map = self.mon_manager.get_osd_dump_json()
575             id_to_name = {}
576             for p in osd_map['pools']:
577                 id_to_name[p['pool']] = p['pool_name']
578
579             self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
580             self.data_pools = {}
581             for data_pool in fsmap['mdsmap']['data_pools']:
582                 self.data_pools[data_pool] = id_to_name[data_pool]
583
584     def get_data_pool_name(self, refresh = False):
585         if refresh or self.data_pools is None:
586             self.get_pool_names(refresh = True)
587         assert(len(self.data_pools) == 1)
588         return self.data_pools.values()[0]
589
590     def get_data_pool_id(self, refresh = False):
591         """
592         Don't call this if you have multiple data pools
593         :return: integer
594         """
595         if refresh or self.data_pools is None:
596             self.get_pool_names(refresh = True)
597         assert(len(self.data_pools) == 1)
598         return self.data_pools.keys()[0]
599
600     def get_data_pool_names(self, refresh = False):
601         if refresh or self.data_pools is None:
602             self.get_pool_names(refresh = True)
603         return self.data_pools.values()
604
605     def get_metadata_pool_name(self):
606         return self.metadata_pool_name
607
608     def set_data_pool_name(self, name):
609         if self.id is not None:
610             raise RuntimeError("can't set filesystem name if its fscid is set")
611         self.data_pool_name = name
612
613     def get_namespace_id(self):
614         return self.id
615
616     def get_pool_df(self, pool_name):
617         """
618         Return a dict like:
619         {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
620         """
621         for pool_df in self._df()['pools']:
622             if pool_df['name'] == pool_name:
623                 return pool_df['stats']
624
625         raise RuntimeError("Pool name '{0}' not found".format(pool_name))
626
627     def get_usage(self):
628         return self._df()['stats']['total_used_bytes']
629
630     def are_daemons_healthy(self):
631         """
632         Return true if all daemons are in one of active, standby, standby-replay, and
633         at least max_mds daemons are in 'active'.
634
635         Unlike most of Filesystem, this function is tolerant of new-style `fs`
636         commands being missing, because we are part of the ceph installation
637         process during upgrade suites, so must fall back to old style commands
638         when we get an EINVAL on a new style command.
639
640         :return:
641         """
642
643         active_count = 0
644         try:
645             mds_map = self.get_mds_map()
646         except CommandFailedError as cfe:
647             # Old version, fall back to non-multi-fs commands
648             if cfe.exitstatus == errno.EINVAL:
649                 mds_map = json.loads(
650                         self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
651             else:
652                 raise
653
654         log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
655
656         for mds_id, mds_status in mds_map['info'].items():
657             if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
658                 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
659                 return False
660             elif mds_status['state'] == 'up:active':
661                 active_count += 1
662
663         log.info("are_daemons_healthy: {0}/{1}".format(
664             active_count, mds_map['max_mds']
665         ))
666
667         if active_count >= mds_map['max_mds']:
668             # The MDSMap says these guys are active, but let's check they really are
669             for mds_id, mds_status in mds_map['info'].items():
670                 if mds_status['state'] == 'up:active':
671                     try:
672                         daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
673                     except CommandFailedError as cfe:
674                         if cfe.exitstatus == errno.EINVAL:
675                             # Old version, can't do this check
676                             continue
677                         else:
678                             # MDS not even running
679                             return False
680
681                     if daemon_status['state'] != 'up:active':
682                         # MDS hasn't taken the latest map yet
683                         return False
684
685             return True
686         else:
687             return False
688
689     def get_daemon_names(self, state=None):
690         """
691         Return MDS daemon names of those daemons in the given state
692         :param state:
693         :return:
694         """
695         status = self.get_mds_map()
696         result = []
697         for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
698             if mds_status['state'] == state or state is None:
699                 result.append(mds_status['name'])
700
701         return result
702
703     def get_active_names(self):
704         """
705         Return MDS daemon names of those daemons holding ranks
706         in state up:active
707
708         :return: list of strings like ['a', 'b'], sorted by rank
709         """
710         return self.get_daemon_names("up:active")
711
712     def get_all_mds_rank(self):
713         status = self.get_mds_map()
714         result = []
715         for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
716             if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
717                 result.append(mds_status['rank'])
718
719         return result
720
721     def get_rank_names(self):
722         """
723         Return MDS daemon names of those daemons holding a rank,
724         sorted by rank.  This includes e.g. up:replay/reconnect
725         as well as active, but does not include standby or
726         standby-replay.
727         """
728         status = self.get_mds_map()
729         result = []
730         for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
731             if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
732                 result.append(mds_status['name'])
733
734         return result
735
736     def wait_for_daemons(self, timeout=None):
737         """
738         Wait until all daemons are healthy
739         :return:
740         """
741
742         if timeout is None:
743             timeout = DAEMON_WAIT_TIMEOUT
744
745         elapsed = 0
746         while True:
747             if self.are_daemons_healthy():
748                 return
749             else:
750                 time.sleep(1)
751                 elapsed += 1
752
753             if elapsed > timeout:
754                 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
755
756     def get_lone_mds_id(self):
757         """
758         Get a single MDS ID: the only one if there is only one
759         configured, else the only one currently holding a rank,
760         else raise an error.
761         """
762         if len(self.mds_ids) != 1:
763             alive = self.get_rank_names()
764             if len(alive) == 1:
765                 return alive[0]
766             else:
767                 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
768         else:
769             return self.mds_ids[0]
770
771     def recreate(self):
772         log.info("Creating new filesystem")
773         self.delete_all_filesystems()
774         self.id = None
775         self.create()
776
777     def put_metadata_object_raw(self, object_id, infile):
778         """
779         Save an object to the metadata pool
780         """
781         temp_bin_path = infile
782         self.client_remote.run(args=[
783             'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
784         ])
785
786     def get_metadata_object_raw(self, object_id):
787         """
788         Retrieve an object from the metadata pool and store it in a file.
789         """
790         temp_bin_path = '/tmp/' + object_id + '.bin'
791
792         self.client_remote.run(args=[
793             'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
794         ])
795
796         return temp_bin_path
797
798     def get_metadata_object(self, object_type, object_id):
799         """
800         Retrieve an object from the metadata pool, pass it through
801         ceph-dencoder to dump it to JSON, and return the decoded object.
802         """
803         temp_bin_path = '/tmp/out.bin'
804
805         self.client_remote.run(args=[
806             'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
807         ])
808
809         stdout = StringIO()
810         self.client_remote.run(args=[
811             'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
812         ], stdout=stdout)
813         dump_json = stdout.getvalue().strip()
814         try:
815             dump = json.loads(dump_json)
816         except (TypeError, ValueError):
817             log.error("Failed to decode JSON: '{0}'".format(dump_json))
818             raise
819
820         return dump
821
822     def get_journal_version(self):
823         """
824         Read the JournalPointer and Journal::Header objects to learn the version of
825         encoding in use.
826         """
827         journal_pointer_object = '400.00000000'
828         journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
829         journal_ino = journal_pointer_dump['journal_pointer']['front']
830
831         journal_header_object = "{0:x}.00000000".format(journal_ino)
832         journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
833
834         version = journal_header_dump['journal_header']['stream_format']
835         log.info("Read journal version {0}".format(version))
836
837         return version
838
839     def mds_asok(self, command, mds_id=None):
840         if mds_id is None:
841             mds_id = self.get_lone_mds_id()
842
843         return self.json_asok(command, 'mds', mds_id)
844
845     def read_cache(self, path, depth=None):
846         cmd = ["dump", "tree", path]
847         if depth is not None:
848             cmd.append(depth.__str__())
849         result = self.mds_asok(cmd)
850         if len(result) == 0:
851             raise RuntimeError("Path not found in cache: {0}".format(path))
852
853         return result
854
855     def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
856         """
857         Block until the MDS reaches a particular state, or a failure condition
858         is met.
859
860         When there are multiple MDSs, succeed when exaclty one MDS is in the
861         goal state, or fail when any MDS is in the reject state.
862
863         :param goal_state: Return once the MDS is in this state
864         :param reject: Fail if the MDS enters this state before the goal state
865         :param timeout: Fail if this many seconds pass before reaching goal
866         :return: number of seconds waited, rounded down to integer
867         """
868
869         started_at = time.time()
870         while True:
871             status = self.status()
872             if rank is not None:
873                 mds_info = status.get_rank(self.id, rank)
874                 current_state = mds_info['state'] if mds_info else None
875                 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
876             elif mds_id is not None:
877                 # mds_info is None if no daemon with this ID exists in the map
878                 mds_info = status.get_mds(mds_id)
879                 current_state = mds_info['state'] if mds_info else None
880                 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
881             else:
882                 # In general, look for a single MDS
883                 states = [m['state'] for m in status.get_ranks(self.id)]
884                 if [s for s in states if s == goal_state] == [goal_state]:
885                     current_state = goal_state
886                 elif reject in states:
887                     current_state = reject
888                 else:
889                     current_state = None
890                 log.info("mapped states {0} to {1}".format(states, current_state))
891
892             elapsed = time.time() - started_at
893             if current_state == goal_state:
894                 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
895                 return elapsed
896             elif reject is not None and current_state == reject:
897                 raise RuntimeError("MDS in reject state {0}".format(current_state))
898             elif timeout is not None and elapsed > timeout:
899                 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
900                 raise RuntimeError(
901                     "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
902                         elapsed, goal_state, current_state
903                     ))
904             else:
905                 time.sleep(1)
906
907     def _read_data_xattr(self, ino_no, xattr_name, type, pool):
908         mds_id = self.mds_ids[0]
909         remote = self.mds_daemons[mds_id].remote
910         if pool is None:
911             pool = self.get_data_pool_name()
912
913         obj_name = "{0:x}.00000000".format(ino_no)
914
915         args = [
916             os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
917         ]
918         try:
919             proc = remote.run(
920                 args=args,
921                 stdout=StringIO())
922         except CommandFailedError as e:
923             log.error(e.__str__())
924             raise ObjectNotFound(obj_name)
925
926         data = proc.stdout.getvalue()
927
928         p = remote.run(
929             args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
930             stdout=StringIO(),
931             stdin=data
932         )
933
934         return json.loads(p.stdout.getvalue().strip())
935
936     def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
937         """
938         Write to an xattr of the 0th data object of an inode.  Will
939         succeed whether the object and/or xattr already exist or not.
940
941         :param ino_no: integer inode number
942         :param xattr_name: string name of the xattr
943         :param data: byte array data to write to the xattr
944         :param pool: name of data pool or None to use primary data pool
945         :return: None
946         """
947         remote = self.mds_daemons[self.mds_ids[0]].remote
948         if pool is None:
949             pool = self.get_data_pool_name()
950
951         obj_name = "{0:x}.00000000".format(ino_no)
952         args = [
953             os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
954             obj_name, xattr_name, data
955         ]
956         remote.run(
957             args=args,
958             stdout=StringIO())
959
960     def read_backtrace(self, ino_no, pool=None):
961         """
962         Read the backtrace from the data pool, return a dict in the format
963         given by inode_backtrace_t::dump, which is something like:
964
965         ::
966
967             rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
968             ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
969
970             { "ino": 1099511627778,
971               "ancestors": [
972                     { "dirino": 1,
973                       "dname": "blah",
974                       "version": 11}],
975               "pool": 1,
976               "old_pools": []}
977
978         :param pool: name of pool to read backtrace from.  If omitted, FS must have only
979                      one data pool and that will be used.
980         """
981         return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
982
983     def read_layout(self, ino_no, pool=None):
984         """
985         Read 'layout' xattr of an inode and parse the result, returning a dict like:
986         ::
987             {
988                 "stripe_unit": 4194304,
989                 "stripe_count": 1,
990                 "object_size": 4194304,
991                 "pool_id": 1,
992                 "pool_ns": "",
993             }
994
995         :param pool: name of pool to read backtrace from.  If omitted, FS must have only
996                      one data pool and that will be used.
997         """
998         return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
999
1000     def _enumerate_data_objects(self, ino, size):
1001         """
1002         Get the list of expected data objects for a range, and the list of objects
1003         that really exist.
1004
1005         :return a tuple of two lists of strings (expected, actual)
1006         """
1007         stripe_size = 1024 * 1024 * 4
1008
1009         size = max(stripe_size, size)
1010
1011         want_objects = [
1012             "{0:x}.{1:08x}".format(ino, n)
1013             for n in range(0, ((size - 1) / stripe_size) + 1)
1014         ]
1015
1016         exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1017
1018         return want_objects, exist_objects
1019
1020     def data_objects_present(self, ino, size):
1021         """
1022         Check that *all* the expected data objects for an inode are present in the data pool
1023         """
1024
1025         want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1026         missing = set(want_objects) - set(exist_objects)
1027
1028         if missing:
1029             log.info("Objects missing (ino {0}, size {1}): {2}".format(
1030                 ino, size, missing
1031             ))
1032             return False
1033         else:
1034             log.info("All objects for ino {0} size {1} found".format(ino, size))
1035             return True
1036
1037     def data_objects_absent(self, ino, size):
1038         want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1039         present = set(want_objects) & set(exist_objects)
1040
1041         if present:
1042             log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1043                 ino, size, present
1044             ))
1045             return False
1046         else:
1047             log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1048             return True
1049
1050     def dirfrag_exists(self, ino, frag):
1051         try:
1052             self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1053         except CommandFailedError as e:
1054             return False
1055         else:
1056             return True
1057
1058     def rados(self, args, pool=None, namespace=None, stdin_data=None):
1059         """
1060         Call into the `rados` CLI from an MDS
1061         """
1062
1063         if pool is None:
1064             pool = self.get_metadata_pool_name()
1065
1066         # Doesn't matter which MDS we use to run rados commands, they all
1067         # have access to the pools
1068         mds_id = self.mds_ids[0]
1069         remote = self.mds_daemons[mds_id].remote
1070
1071         # NB we could alternatively use librados pybindings for this, but it's a one-liner
1072         # using the `rados` CLI
1073         args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1074                 (["--namespace", namespace] if namespace else []) +
1075                 args)
1076         p = remote.run(
1077             args=args,
1078             stdin=stdin_data,
1079             stdout=StringIO())
1080         return p.stdout.getvalue().strip()
1081
1082     def list_dirfrag(self, dir_ino):
1083         """
1084         Read the named object and return the list of omap keys
1085
1086         :return a list of 0 or more strings
1087         """
1088
1089         dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1090
1091         try:
1092             key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1093         except CommandFailedError as e:
1094             log.error(e.__str__())
1095             raise ObjectNotFound(dirfrag_obj_name)
1096
1097         return key_list_str.split("\n") if key_list_str else []
1098
1099     def erase_metadata_objects(self, prefix):
1100         """
1101         For all objects in the metadata pool matching the prefix,
1102         erase them.
1103
1104         This O(N) with the number of objects in the pool, so only suitable
1105         for use on toy test filesystems.
1106         """
1107         all_objects = self.rados(["ls"]).split("\n")
1108         matching_objects = [o for o in all_objects if o.startswith(prefix)]
1109         for o in matching_objects:
1110             self.rados(["rm", o])
1111
1112     def erase_mds_objects(self, rank):
1113         """
1114         Erase all the per-MDS objects for a particular rank.  This includes
1115         inotable, sessiontable, journal
1116         """
1117
1118         def obj_prefix(multiplier):
1119             """
1120             MDS object naming conventions like rank 1's
1121             journal is at 201.***
1122             """
1123             return "%x." % (multiplier * 0x100 + rank)
1124
1125         # MDS_INO_LOG_OFFSET
1126         self.erase_metadata_objects(obj_prefix(2))
1127         # MDS_INO_LOG_BACKUP_OFFSET
1128         self.erase_metadata_objects(obj_prefix(3))
1129         # MDS_INO_LOG_POINTER_OFFSET
1130         self.erase_metadata_objects(obj_prefix(4))
1131         # MDSTables & SessionMap
1132         self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1133
1134     @property
1135     def _prefix(self):
1136         """
1137         Override this to set a different
1138         """
1139         return ""
1140
1141     def _run_tool(self, tool, args, rank=None, quiet=False):
1142         # Tests frequently have [client] configuration that jacks up
1143         # the objecter log level (unlikely to be interesting here)
1144         # and does not set the mds log level (very interesting here)
1145         if quiet:
1146             base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1147         else:
1148             base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1149
1150         if rank is not None:
1151             base_args.extend(["--rank", "%d" % rank])
1152
1153         t1 = datetime.datetime.now()
1154         r = self.tool_remote.run(
1155             args=base_args + args,
1156             stdout=StringIO()).stdout.getvalue().strip()
1157         duration = datetime.datetime.now() - t1
1158         log.info("Ran {0} in time {1}, result:\n{2}".format(
1159             base_args + args, duration, r
1160         ))
1161         return r
1162
1163     @property
1164     def tool_remote(self):
1165         """
1166         An arbitrary remote to use when invoking recovery tools.  Use an MDS host because
1167         it'll definitely have keys with perms to access cephfs metadata pool.  This is public
1168         so that tests can use this remote to go get locally written output files from the tools.
1169         """
1170         mds_id = self.mds_ids[0]
1171         return self.mds_daemons[mds_id].remote
1172
1173     def journal_tool(self, args, rank=None, quiet=False):
1174         """
1175         Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1176         """
1177         return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1178
1179     def table_tool(self, args, quiet=False):
1180         """
1181         Invoke cephfs-table-tool with the passed arguments, and return its stdout
1182         """
1183         return self._run_tool("cephfs-table-tool", args, None, quiet)
1184
1185     def data_scan(self, args, quiet=False, worker_count=1):
1186         """
1187         Invoke cephfs-data-scan with the passed arguments, and return its stdout
1188
1189         :param worker_count: if greater than 1, multiple workers will be run
1190                              in parallel and the return value will be None
1191         """
1192
1193         workers = []
1194
1195         for n in range(0, worker_count):
1196             if worker_count > 1:
1197                 # data-scan args first token is a command, followed by args to it.
1198                 # insert worker arguments after the command.
1199                 cmd = args[0]
1200                 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1201             else:
1202                 worker_args = args
1203
1204             workers.append(Greenlet.spawn(lambda wargs=worker_args:
1205                                           self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1206
1207         for w in workers:
1208             w.get()
1209
1210         if worker_count == 1:
1211             return workers[0].value
1212         else:
1213             return None