Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster.  Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8     # Activate the teuthology virtualenv
9     source ~/git/teuthology/virtualenv/bin/activate
10     # Go into your ceph build directory
11     cd ~/git/ceph/build
12     # Invoke a test using this script
13     python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17     # Alternatively, if you use different paths, specify them as follows:
18     LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20     # If you wish to drop to a python shell on failures, use --interactive:
21     python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23     # If you wish to run a named test case, pass it as an argument:
24     python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 """
27
28 from StringIO import StringIO
29 from collections import defaultdict
30 import getpass
31 import signal
32 import tempfile
33 import threading
34 import datetime
35 import shutil
36 import re
37 import os
38 import time
39 import json
40 import sys
41 import errno
42 from unittest import suite, loader
43 import unittest
44 import platform
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
48
49 import logging
50
51 log = logging.getLogger(__name__)
52
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55     fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56     datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
60
61
62 def respawn_in_path(lib_path, python_paths):
63     execv_cmd = ['python']
64     if platform.system() == "Darwin":
65         lib_path_var = "DYLD_LIBRARY_PATH"
66     else:
67         lib_path_var = "LD_LIBRARY_PATH"
68
69     py_binary = os.environ.get("PYTHON", "python")
70
71     if lib_path_var in os.environ:
72         if lib_path not in os.environ[lib_path_var]:
73             os.environ[lib_path_var] += ':' + lib_path
74             os.execvp(py_binary, execv_cmd + sys.argv)
75     else:
76         os.environ[lib_path_var] = lib_path
77         os.execvp(py_binary, execv_cmd + sys.argv)
78
79     for p in python_paths:
80         sys.path.insert(0, p)
81
82
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
85
86     # A list of candidate paths for each package we need
87     guesses = [
88         ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89         ["lib/cython_modules/lib.2"],
90         ["../src/pybind"],
91     ]
92
93     python_paths = []
94
95     # Up one level so that "tasks.foo.bar" imports work
96     python_paths.append(os.path.abspath(
97         os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
98     ))
99
100     for package_guesses in guesses:
101         for g in package_guesses:
102             g_exp = os.path.abspath(os.path.expanduser(g))
103             if os.path.exists(g_exp):
104                 python_paths.append(g_exp)
105
106     ld_path = os.path.join(os.getcwd(), "lib/")
107     print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108     respawn_in_path(ld_path, python_paths)
109
110
111 try:
112     from teuthology.exceptions import CommandFailedError
113     from tasks.ceph_manager import CephManager
114     from tasks.cephfs.fuse_mount import FuseMount
115     from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116     from mgr.mgr_test_case import MgrCluster
117     from teuthology.contextutil import MaxWhileTries
118     from teuthology.task import interactive
119 except ImportError:
120     sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121                      "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
122     raise
123
124 # Must import after teuthology because of gevent monkey patching
125 import subprocess
126
127 if os.path.exists("./CMakeCache.txt"):
128     # Running in build dir of a cmake build
129     BIN_PREFIX = "./bin/"
130     SRC_PREFIX = "../src"
131 else:
132     # Running in src/ of an autotools build
133     BIN_PREFIX = "./"
134     SRC_PREFIX = "./"
135
136
137 class LocalRemoteProcess(object):
138     def __init__(self, args, subproc, check_status, stdout, stderr):
139         self.args = args
140         self.subproc = subproc
141         if stdout is None:
142             self.stdout = StringIO()
143         else:
144             self.stdout = stdout
145
146         if stderr is None:
147             self.stderr = StringIO()
148         else:
149             self.stderr = stderr
150
151         self.check_status = check_status
152         self.exitstatus = self.returncode = None
153
154     def wait(self):
155         if self.finished:
156             # Avoid calling communicate() on a dead process because it'll
157             # give you stick about std* already being closed
158             if self.exitstatus != 0:
159                 raise CommandFailedError(self.args, self.exitstatus)
160             else:
161                 return
162
163         out, err = self.subproc.communicate()
164         self.stdout.write(out)
165         self.stderr.write(err)
166
167         self.exitstatus = self.returncode = self.subproc.returncode
168
169         if self.exitstatus != 0:
170             sys.stderr.write(out)
171             sys.stderr.write(err)
172
173         if self.check_status and self.exitstatus != 0:
174             raise CommandFailedError(self.args, self.exitstatus)
175
176     @property
177     def finished(self):
178         if self.exitstatus is not None:
179             return True
180
181         if self.subproc.poll() is not None:
182             out, err = self.subproc.communicate()
183             self.stdout.write(out)
184             self.stderr.write(err)
185             self.exitstatus = self.returncode = self.subproc.returncode
186             return True
187         else:
188             return False
189
190     def kill(self):
191         log.info("kill ")
192         if self.subproc.pid and not self.finished:
193             log.info("kill: killing pid {0} ({1})".format(
194                 self.subproc.pid, self.args))
195             safe_kill(self.subproc.pid)
196         else:
197             log.info("kill: already terminated ({0})".format(self.args))
198
199     @property
200     def stdin(self):
201         class FakeStdIn(object):
202             def __init__(self, mount_daemon):
203                 self.mount_daemon = mount_daemon
204
205             def close(self):
206                 self.mount_daemon.kill()
207
208         return FakeStdIn(self)
209
210
211 class LocalRemote(object):
212     """
213     Amusingly named class to present the teuthology RemoteProcess interface when we are really
214     running things locally for vstart
215
216     Run this inside your src/ dir!
217     """
218
219     def __init__(self):
220         self.name = "local"
221         self.hostname = "localhost"
222         self.user = getpass.getuser()
223
224     def get_file(self, path, sudo, dest_dir):
225         tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226         shutil.copy(path, tmpfile)
227         return tmpfile
228
229     def put_file(self, src, dst, sudo=False):
230         shutil.copy(src, dst)
231
232     def run(self, args, check_status=True, wait=True,
233             stdout=None, stderr=None, cwd=None, stdin=None,
234             logger=None, label=None, env=None):
235         log.info("run args={0}".format(args))
236
237         # We don't need no stinkin' sudo
238         args = [a for a in args if a != "sudo"]
239
240         # We have to use shell=True if any run.Raw was present, e.g. &&
241         shell = any([a for a in args if isinstance(a, Raw)])
242
243         if shell:
244             filtered = []
245             i = 0
246             while i < len(args):
247                 if args[i] == 'adjust-ulimits':
248                     i += 1
249                 elif args[i] == 'ceph-coverage':
250                     i += 2
251                 elif args[i] == 'timeout':
252                     i += 2
253                 else:
254                     filtered.append(args[i])
255                     i += 1
256
257             args = quote(filtered)
258             log.info("Running {0}".format(args))
259
260             subproc = subprocess.Popen(args,
261                                        stdout=subprocess.PIPE,
262                                        stderr=subprocess.PIPE,
263                                        stdin=subprocess.PIPE,
264                                        cwd=cwd,
265                                        shell=True)
266         else:
267             log.info("Running {0}".format(args))
268
269             for arg in args:
270                 if not isinstance(arg, basestring):
271                     raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
272                         arg, arg.__class__
273                     ))
274
275             subproc = subprocess.Popen(args,
276                                        stdout=subprocess.PIPE,
277                                        stderr=subprocess.PIPE,
278                                        stdin=subprocess.PIPE,
279                                        cwd=cwd,
280                                        env=env)
281
282         if stdin:
283             if not isinstance(stdin, basestring):
284                 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
285
286             # Hack: writing to stdin is not deadlock-safe, but it "always" works
287             # as long as the input buffer is "small"
288             subproc.stdin.write(stdin)
289
290         proc = LocalRemoteProcess(
291             args, subproc, check_status,
292             stdout, stderr
293         )
294
295         if wait:
296             proc.wait()
297
298         return proc
299
300
301 class LocalDaemon(object):
302     def __init__(self, daemon_type, daemon_id):
303         self.daemon_type = daemon_type
304         self.daemon_id = daemon_id
305         self.controller = LocalRemote()
306         self.proc = None
307
308     @property
309     def remote(self):
310         return LocalRemote()
311
312     def running(self):
313         return self._get_pid() is not None
314
315     def _get_pid(self):
316         """
317         Return PID as an integer or None if not found
318         """
319         ps_txt = self.controller.run(
320             args=["ps", "ww", "-u"+str(os.getuid())]
321         ).stdout.getvalue().strip()
322         lines = ps_txt.split("\n")[1:]
323
324         for line in lines:
325             if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326                 log.info("Found ps line for daemon: {0}".format(line))
327                 return int(line.split()[0])
328         log.info("No match for {0} {1}: {2}".format(
329             self.daemon_type, self.daemon_id, ps_txt
330             ))
331         return None
332
333     def wait(self, timeout):
334         waited = 0
335         while self._get_pid() is not None:
336             if waited > timeout:
337                 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
338             time.sleep(1)
339             waited += 1
340
341     def stop(self, timeout=300):
342         if not self.running():
343             log.error('tried to stop a non-running daemon')
344             return
345
346         pid = self._get_pid()
347         log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348         os.kill(pid, signal.SIGKILL)
349
350         waited = 0
351         while pid is not None:
352             new_pid = self._get_pid()
353             if new_pid is not None and new_pid != pid:
354                 log.info("Killing new PID {0}".format(new_pid))
355                 pid = new_pid
356                 os.kill(pid, signal.SIGKILL)
357
358             if new_pid is None:
359                 break
360             else:
361                 if waited > timeout:
362                     raise MaxWhileTries(
363                         "Timed out waiting for daemon {0}.{1}".format(
364                             self.daemon_type, self.daemon_id))
365                 time.sleep(1)
366                 waited += 1
367
368         self.wait(timeout=timeout)
369
370     def restart(self):
371         if self._get_pid() is not None:
372             self.stop()
373
374         self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
375
376
377 def safe_kill(pid):
378     """
379     os.kill annoyingly raises exception if process already dead.  Ignore it.
380     """
381     try:
382         return os.kill(pid, signal.SIGKILL)
383     except OSError as e:
384         if e.errno == errno.ESRCH:
385             # Raced with process termination
386             pass
387         else:
388             raise
389
390
391 class LocalFuseMount(FuseMount):
392     def __init__(self, test_dir, client_id):
393         super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
394
395     @property
396     def config_path(self):
397         return "./ceph.conf"
398
399     def get_keyring_path(self):
400         # This is going to end up in a config file, so use an absolute path
401         # to avoid assumptions about daemons' pwd
402         return os.path.abspath("./client.{0}.keyring".format(self.client_id))
403
404     def run_shell(self, args, wait=True):
405         # FIXME maybe should add a pwd arg to teuthology.orchestra so that
406         # the "cd foo && bar" shenanigans isn't needed to begin with and
407         # then we wouldn't have to special case this
408         return self.client_remote.run(
409             args, wait=wait, cwd=self.mountpoint
410         )
411
412     @property
413     def _prefix(self):
414         return BIN_PREFIX
415
416     def _asok_path(self):
417         # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
418         # run foreground.  When running it daemonized however, the asok is named after
419         # the PID of the launching process, not the long running ceph-fuse process.  Therefore
420         # we need to give an exact path here as the logic for checking /proc/ for which
421         # asok is alive does not work.
422         path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
423         log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
424         return path
425
426     def umount(self):
427         if self.is_mounted():
428             super(LocalFuseMount, self).umount()
429
430     def mount(self, mount_path=None, mount_fs_name=None):
431         self.client_remote.run(
432             args=[
433                 'mkdir',
434                 '--',
435                 self.mountpoint,
436             ],
437         )
438
439         def list_connections():
440             self.client_remote.run(
441                 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
442                 check_status=False
443             )
444             p = self.client_remote.run(
445                 args=["ls", "/sys/fs/fuse/connections"],
446                 check_status=False
447             )
448             if p.exitstatus != 0:
449                 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
450                 return []
451
452             ls_str = p.stdout.getvalue().strip()
453             if ls_str:
454                 return [int(n) for n in ls_str.split("\n")]
455             else:
456                 return []
457
458         # Before starting ceph-fuse process, note the contents of
459         # /sys/fs/fuse/connections
460         pre_mount_conns = list_connections()
461         log.info("Pre-mount connections: {0}".format(pre_mount_conns))
462
463         prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
464         if os.getuid() != 0:
465             prefix += ["--client-die-on-failed-remount=false"]
466
467         if mount_path is not None:
468             prefix += ["--client_mountpoint={0}".format(mount_path)]
469
470         if mount_fs_name is not None:
471             prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
472
473         self.fuse_daemon = self.client_remote.run(args=
474                                             prefix + [
475                                                 "-f",
476                                                 "--name",
477                                                 "client.{0}".format(self.client_id),
478                                                 self.mountpoint
479                                             ], wait=False)
480
481         log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
482
483         # Wait for the connection reference to appear in /sys
484         waited = 0
485         post_mount_conns = list_connections()
486         while len(post_mount_conns) <= len(pre_mount_conns):
487             if self.fuse_daemon.finished:
488                 # Did mount fail?  Raise the CommandFailedError instead of
489                 # hitting the "failed to populate /sys/" timeout
490                 self.fuse_daemon.wait()
491             time.sleep(1)
492             waited += 1
493             if waited > 30:
494                 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
495                     waited
496                 ))
497             post_mount_conns = list_connections()
498
499         log.info("Post-mount connections: {0}".format(post_mount_conns))
500
501         # Record our fuse connection number so that we can use it when
502         # forcing an unmount
503         new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
504         if len(new_conns) == 0:
505             raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
506         elif len(new_conns) > 1:
507             raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
508         else:
509             self._fuse_conn = new_conns[0]
510
511     def _run_python(self, pyscript):
512         """
513         Override this to remove the daemon-helper prefix that is used otherwise
514         to make the process killable.
515         """
516         return self.client_remote.run(args=[
517             'python', '-c', pyscript
518         ], wait=False)
519
520
521 class LocalCephManager(CephManager):
522     def __init__(self):
523         # Deliberately skip parent init, only inheriting from it to get
524         # util methods like osd_dump that sit on top of raw_cluster_cmd
525         self.controller = LocalRemote()
526
527         # A minority of CephManager fns actually bother locking for when
528         # certain teuthology tests want to run tasks in parallel
529         self.lock = threading.RLock()
530
531         self.log = lambda x: log.info(x)
532
533     def find_remote(self, daemon_type, daemon_id):
534         """
535         daemon_type like 'mds', 'osd'
536         daemon_id like 'a', '0'
537         """
538         return LocalRemote()
539
540     def run_ceph_w(self):
541         proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
542         return proc
543
544     def raw_cluster_cmd(self, *args):
545         """
546         args like ["osd", "dump"}
547         return stdout string
548         """
549         proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
550         return proc.stdout.getvalue()
551
552     def raw_cluster_cmd_result(self, *args):
553         """
554         like raw_cluster_cmd but don't check status, just return rc
555         """
556         proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
557         return proc.exitstatus
558
559     def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
560         return self.controller.run(
561             args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
562         )
563
564     # FIXME: copypasta
565     def get_mds_status(self, mds):
566         """
567         Run cluster commands for the mds in order to get mds information
568         """
569         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
570         j = json.loads(' '.join(out.splitlines()[1:]))
571         # collate; for dup ids, larger gid wins.
572         for info in j['info'].itervalues():
573             if info['name'] == mds:
574                 return info
575         return None
576
577     # FIXME: copypasta
578     def get_mds_status_by_rank(self, rank):
579         """
580         Run cluster commands for the mds in order to get mds information
581         check rank.
582         """
583         j = self.get_mds_status_all()
584         # collate; for dup ids, larger gid wins.
585         for info in j['info'].itervalues():
586             if info['rank'] == rank:
587                 return info
588         return None
589
590     def get_mds_status_all(self):
591         """
592         Run cluster command to extract all the mds status.
593         """
594         out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
595         j = json.loads(' '.join(out.splitlines()[1:]))
596         return j
597
598
599 class LocalCephCluster(CephCluster):
600     def __init__(self, ctx):
601         # Deliberately skip calling parent constructor
602         self._ctx = ctx
603         self.mon_manager = LocalCephManager()
604         self._conf = defaultdict(dict)
605
606     @property
607     def admin_remote(self):
608         return LocalRemote()
609
610     def get_config(self, key, service_type=None):
611         if service_type is None:
612             service_type = 'mon'
613
614         # FIXME hardcoded vstart service IDs
615         service_id = {
616             'mon': 'a',
617             'mds': 'a',
618             'osd': '0'
619         }[service_type]
620
621         return self.json_asok(['config', 'get', key], service_type, service_id)[key]
622
623     def _write_conf(self):
624         # In teuthology, we have the honour of writing the entire ceph.conf, but
625         # in vstart land it has mostly already been written and we need to carefully
626         # append to it.
627         conf_path = "./ceph.conf"
628         banner = "\n#LOCAL_TEST\n"
629         existing_str = open(conf_path).read()
630
631         if banner in existing_str:
632             existing_str = existing_str[0:existing_str.find(banner)]
633
634         existing_str += banner
635
636         for subsys, kvs in self._conf.items():
637             existing_str += "\n[{0}]\n".format(subsys)
638             for key, val in kvs.items():
639                 # Comment out existing instance if it exists
640                 log.info("Searching for existing instance {0}/{1}".format(
641                     key, subsys
642                 ))
643                 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
644                     subsys
645                 ), existing_str, re.MULTILINE)
646
647                 if existing_section:
648                     section_str = existing_str[existing_section.start():existing_section.end()]
649                     existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
650                     if existing_val:
651                         start = existing_section.start() + existing_val.start(1)
652                         log.info("Found string to replace at {0}".format(
653                             start
654                         ))
655                         existing_str = existing_str[0:start] + "#" + existing_str[start:]
656
657                 existing_str += "{0} = {1}\n".format(key, val)
658
659         open(conf_path, "w").write(existing_str)
660
661     def set_ceph_conf(self, subsys, key, value):
662         self._conf[subsys][key] = value
663         self._write_conf()
664
665     def clear_ceph_conf(self, subsys, key):
666         del self._conf[subsys][key]
667         self._write_conf()
668
669
670 class LocalMDSCluster(LocalCephCluster, MDSCluster):
671     def __init__(self, ctx):
672         super(LocalMDSCluster, self).__init__(ctx)
673
674         self.mds_ids = ctx.daemons.daemons['mds'].keys()
675         self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
676
677     def clear_firewall(self):
678         # FIXME: unimplemented
679         pass
680
681     def newfs(self, name='cephfs', create=True):
682         return LocalFilesystem(self._ctx, name=name, create=create)
683
684
685 class LocalMgrCluster(LocalCephCluster, MgrCluster):
686     def __init__(self, ctx):
687         super(LocalMgrCluster, self).__init__(ctx)
688
689         self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
690         self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
691
692
693 class LocalFilesystem(Filesystem, LocalMDSCluster):
694     def __init__(self, ctx, fscid=None, name='cephfs', create=False):
695         # Deliberately skip calling parent constructor
696         self._ctx = ctx
697
698         self.id = None
699         self.name = None
700         self.metadata_pool_name = None
701         self.metadata_overlay = False
702         self.data_pool_name = None
703         self.data_pools = None
704
705         # Hack: cheeky inspection of ceph.conf to see what MDSs exist
706         self.mds_ids = set()
707         for line in open("ceph.conf").readlines():
708             match = re.match("^\[mds\.(.+)\]$", line)
709             if match:
710                 self.mds_ids.add(match.group(1))
711
712         if not self.mds_ids:
713             raise RuntimeError("No MDSs found in ceph.conf!")
714
715         self.mds_ids = list(self.mds_ids)
716
717         log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
718
719         self.mon_manager = LocalCephManager()
720
721         self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
722
723         self.client_remote = LocalRemote()
724
725         self._conf = defaultdict(dict)
726
727         if name is not None:
728             if fscid is not None:
729                 raise RuntimeError("cannot specify fscid when creating fs")
730             if create and not self.legacy_configured():
731                 self.create()
732         else:
733             if fscid is not None:
734                 self.id = fscid
735                 self.getinfo(refresh=True)
736
737         # Stash a reference to the first created filesystem on ctx, so
738         # that if someone drops to the interactive shell they can easily
739         # poke our methods.
740         if not hasattr(self._ctx, "filesystem"):
741             self._ctx.filesystem = self
742
743     @property
744     def _prefix(self):
745         return BIN_PREFIX
746
747     def set_clients_block(self, blocked, mds_id=None):
748         raise NotImplementedError()
749
750     def get_pgs_per_fs_pool(self):
751         # FIXME: assuming there are 3 OSDs
752         return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
753
754
755 class InteractiveFailureResult(unittest.TextTestResult):
756     """
757     Specialization that implements interactive-on-error style
758     behavior.
759     """
760     def addFailure(self, test, err):
761         super(InteractiveFailureResult, self).addFailure(test, err)
762         log.error(self._exc_info_to_string(err, test))
763         log.error("Failure in test '{0}', going interactive".format(
764             self.getDescription(test)
765         ))
766         interactive.task(ctx=None, config=None)
767
768     def addError(self, test, err):
769         super(InteractiveFailureResult, self).addError(test, err)
770         log.error(self._exc_info_to_string(err, test))
771         log.error("Error in test '{0}', going interactive".format(
772             self.getDescription(test)
773         ))
774         interactive.task(ctx=None, config=None)
775
776
777 def enumerate_methods(s):
778     log.info("e: {0}".format(s))
779     for t in s._tests:
780         if isinstance(t, suite.BaseTestSuite):
781             for sub in enumerate_methods(t):
782                 yield sub
783         else:
784             yield s, t
785
786
787 def load_tests(modules, loader):
788     if modules:
789         log.info("Executing modules: {0}".format(modules))
790         module_suites = []
791         for mod_name in modules:
792             # Test names like cephfs.test_auto_repair
793             module_suites.append(loader.loadTestsFromName(mod_name))
794         log.info("Loaded: {0}".format(list(module_suites)))
795         return suite.TestSuite(module_suites)
796     else:
797         log.info("Executing all cephfs tests")
798         return loader.discover(
799             os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
800         )
801
802
803 def scan_tests(modules):
804     overall_suite = load_tests(modules, loader.TestLoader())
805
806     max_required_mds = 0
807     max_required_clients = 0
808     max_required_mgr = 0
809
810     for suite, case in enumerate_methods(overall_suite):
811         max_required_mds = max(max_required_mds,
812                                getattr(case, "MDSS_REQUIRED", 0))
813         max_required_clients = max(max_required_clients,
814                                getattr(case, "CLIENTS_REQUIRED", 0))
815         max_required_mgr = max(max_required_mgr,
816                                getattr(case, "MGRS_REQUIRED", 0))
817
818     return max_required_mds, max_required_clients, max_required_mgr
819
820
821 class LocalCluster(object):
822     def __init__(self, rolename="placeholder"):
823         self.remotes = {
824             LocalRemote(): [rolename]
825         }
826
827     def only(self, requested):
828         return self.__class__(rolename=requested)
829
830
831 class LocalContext(object):
832     def __init__(self):
833         self.config = {}
834         self.teuthology_config = teuth_config
835         self.cluster = LocalCluster()
836         self.daemons = DaemonGroup()
837
838         # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
839         # tests that want to look these up via ctx can do so.
840         # Inspect ceph.conf to see what roles exist
841         for conf_line in open("ceph.conf").readlines():
842             for svc_type in ["mon", "osd", "mds", "mgr"]:
843                 if svc_type not in self.daemons.daemons:
844                     self.daemons.daemons[svc_type] = {}
845                 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
846                 if match:
847                     svc_id = match.group(1)
848                     self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
849
850     def __del__(self):
851         shutil.rmtree(self.teuthology_config['test_path'])
852
853
854 def exec_test():
855     # Parse arguments
856     interactive_on_error = False
857     create_cluster = False
858
859     args = sys.argv[1:]
860     flags = [a for a in args if a.startswith("-")]
861     modules = [a for a in args if not a.startswith("-")]
862     for f in flags:
863         if f == "--interactive":
864             interactive_on_error = True
865         elif f == "--create":
866             create_cluster = True
867         else:
868             log.error("Unknown option '{0}'".format(f))
869             sys.exit(-1)
870
871     # Help developers by stopping up-front if their tree isn't built enough for all the
872     # tools that the tests might want to use (add more here if needed)
873     require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
874                         "cephfs-table-tool", "ceph-fuse", "rados"]
875     missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
876     if missing_binaries:
877         log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
878         sys.exit(-1)
879
880     max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
881
882     remote = LocalRemote()
883
884     # Tolerate no MDSs or clients running at start
885     ps_txt = remote.run(
886         args=["ps", "-u"+str(os.getuid())]
887     ).stdout.getvalue().strip()
888     lines = ps_txt.split("\n")[1:]
889     for line in lines:
890         if 'ceph-fuse' in line or 'ceph-mds' in line:
891             pid = int(line.split()[0])
892             log.warn("Killing stray process {0}".format(line))
893             os.kill(pid, signal.SIGKILL)
894
895     # Fire up the Ceph cluster if the user requested it
896     if create_cluster:
897         log.info("Creating cluster with {0} MDS daemons".format(
898             max_required_mds))
899         remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
900         remote.run(["rm", "-rf", "./out"])
901         remote.run(["rm", "-rf", "./dev"])
902         vstart_env = os.environ.copy()
903         vstart_env["FS"] = "0"
904         vstart_env["MDS"] = max_required_mds.__str__()
905         vstart_env["OSD"] = "1"
906         vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
907
908         remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
909                    env=vstart_env)
910
911         # Wait for OSD to come up so that subsequent injectargs etc will
912         # definitely succeed
913         LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
914
915     # List of client mounts, sufficient to run the selected tests
916     clients = [i.__str__() for i in range(0, max_required_clients)]
917
918     test_dir = tempfile.mkdtemp()
919     teuth_config['test_path'] = test_dir
920
921     # Construct Mount classes
922     mounts = []
923     for client_id in clients:
924         # Populate client keyring (it sucks to use client.admin for test clients
925         # because it's awkward to find the logs later)
926         client_name = "client.{0}".format(client_id)
927
928         if client_name not in open("./keyring").read():
929             p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
930                                  "osd", "allow rw",
931                                  "mds", "allow",
932                                  "mon", "allow r"])
933
934             open("./keyring", "a").write(p.stdout.getvalue())
935
936         mount = LocalFuseMount(test_dir, client_id)
937         mounts.append(mount)
938         if mount.is_mounted():
939             log.warn("unmounting {0}".format(mount.mountpoint))
940             mount.umount_wait()
941         else:
942             if os.path.exists(mount.mountpoint):
943                 os.rmdir(mount.mountpoint)
944
945     ctx = LocalContext()
946     ceph_cluster = LocalCephCluster(ctx)
947     mds_cluster = LocalMDSCluster(ctx)
948     mgr_cluster = LocalMgrCluster(ctx)
949
950     from tasks.cephfs_test_runner import DecoratingLoader
951
952     class LogStream(object):
953         def __init__(self):
954             self.buffer = ""
955
956         def write(self, data):
957             self.buffer += data
958             if "\n" in self.buffer:
959                 lines = self.buffer.split("\n")
960                 for line in lines[:-1]:
961                     pass
962                     # sys.stderr.write(line + "\n")
963                     log.info(line)
964                 self.buffer = lines[-1]
965
966         def flush(self):
967             pass
968
969     decorating_loader = DecoratingLoader({
970         "ctx": ctx,
971         "mounts": mounts,
972         "ceph_cluster": ceph_cluster,
973         "mds_cluster": mds_cluster,
974         "mgr_cluster": mgr_cluster,
975     })
976
977     # For the benefit of polling tests like test_full -- in teuthology land we set this
978     # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
979     remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
980     ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
981
982     # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
983     # from normal IO latency.  Increase it for running teests.
984     ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
985
986     # Make sure the filesystem created in tests has uid/gid that will let us talk to
987     # it after mounting it (without having to  go root).  Set in 'global' not just 'mds'
988     # so that cephfs-data-scan will pick it up too.
989     ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
990     ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
991
992     # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
993     def _get_package_version(remote, pkg_name):
994         # Used in cephfs tests to find fuse version.  Your development workstation *does* have >=2.9, right?
995         return "2.9"
996
997     import teuthology.packaging
998     teuthology.packaging.get_package_version = _get_package_version
999
1000     overall_suite = load_tests(modules, decorating_loader)
1001
1002     # Filter out tests that don't lend themselves to interactive running,
1003     victims = []
1004     for case, method in enumerate_methods(overall_suite):
1005         fn = getattr(method, method._testMethodName)
1006
1007         drop_test = False
1008
1009         if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1010             drop_test = True
1011             log.warn("Dropping test because long running: ".format(method.id()))
1012
1013         if getattr(fn, "needs_trimming", False) is True:
1014             drop_test = (os.getuid() != 0)
1015             log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1016
1017         if drop_test:
1018             # Don't drop the test if it was explicitly requested in arguments
1019             is_named = False
1020             for named in modules:
1021                 if named.endswith(method.id()):
1022                     is_named = True
1023                     break
1024
1025             if not is_named:
1026                 victims.append((case, method))
1027
1028     log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1029     for s, method in victims:
1030         s._tests.remove(method)
1031
1032     if interactive_on_error:
1033         result_class = InteractiveFailureResult
1034     else:
1035         result_class = unittest.TextTestResult
1036     fail_on_skip = False
1037
1038     class LoggingResult(result_class):
1039         def startTest(self, test):
1040             log.info("Starting test: {0}".format(self.getDescription(test)))
1041             test.started_at = datetime.datetime.utcnow()
1042             return super(LoggingResult, self).startTest(test)
1043
1044         def stopTest(self, test):
1045             log.info("Stopped test: {0} in {1}s".format(
1046                 self.getDescription(test),
1047                 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1048             ))
1049
1050         def addSkip(self, test, reason):
1051             if fail_on_skip:
1052                 # Don't just call addFailure because that requires a traceback
1053                 self.failures.append((test, reason))
1054             else:
1055                 super(LoggingResult, self).addSkip(test, reason)
1056
1057     # Execute!
1058     result = unittest.TextTestRunner(
1059         stream=LogStream(),
1060         resultclass=LoggingResult,
1061         verbosity=2,
1062         failfast=True).run(overall_suite)
1063
1064     if not result.wasSuccessful():
1065         result.printErrors()  # duplicate output at end for convenience
1066
1067         bad_tests = []
1068         for test, error in result.errors:
1069             bad_tests.append(str(test))
1070         for test, failure in result.failures:
1071             bad_tests.append(str(test))
1072
1073         sys.exit(-1)
1074     else:
1075         sys.exit(0)
1076
1077
1078 if __name__ == "__main__":
1079     exec_test()