2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
28 from StringIO import StringIO
29 from collections import defaultdict
42 from unittest import suite, loader
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
51 log = logging.getLogger(__name__)
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
62 def respawn_in_path(lib_path, python_paths):
63 execv_cmd = ['python']
64 if platform.system() == "Darwin":
65 lib_path_var = "DYLD_LIBRARY_PATH"
67 lib_path_var = "LD_LIBRARY_PATH"
69 py_binary = os.environ.get("PYTHON", "python")
71 if lib_path_var in os.environ:
72 if lib_path not in os.environ[lib_path_var]:
73 os.environ[lib_path_var] += ':' + lib_path
74 os.execvp(py_binary, execv_cmd + sys.argv)
76 os.environ[lib_path_var] = lib_path
77 os.execvp(py_binary, execv_cmd + sys.argv)
79 for p in python_paths:
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
86 # A list of candidate paths for each package we need
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths.append(os.path.abspath(
97 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
100 for package_guesses in guesses:
101 for g in package_guesses:
102 g_exp = os.path.abspath(os.path.expanduser(g))
103 if os.path.exists(g_exp):
104 python_paths.append(g_exp)
106 ld_path = os.path.join(os.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108 respawn_in_path(ld_path, python_paths)
112 from teuthology.exceptions import CommandFailedError
113 from tasks.ceph_manager import CephManager
114 from tasks.cephfs.fuse_mount import FuseMount
115 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116 from mgr.mgr_test_case import MgrCluster
117 from teuthology.contextutil import MaxWhileTries
118 from teuthology.task import interactive
120 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
124 # Must import after teuthology because of gevent monkey patching
127 if os.path.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX = "./bin/"
130 SRC_PREFIX = "../src"
132 # Running in src/ of an autotools build
137 class LocalRemoteProcess(object):
138 def __init__(self, args, subproc, check_status, stdout, stderr):
140 self.subproc = subproc
142 self.stdout = StringIO()
147 self.stderr = StringIO()
151 self.check_status = check_status
152 self.exitstatus = self.returncode = None
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self.exitstatus != 0:
159 raise CommandFailedError(self.args, self.exitstatus)
163 out, err = self.subproc.communicate()
164 self.stdout.write(out)
165 self.stderr.write(err)
167 self.exitstatus = self.returncode = self.subproc.returncode
169 if self.exitstatus != 0:
170 sys.stderr.write(out)
171 sys.stderr.write(err)
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
178 if self.exitstatus is not None:
181 if self.subproc.poll() is not None:
182 out, err = self.subproc.communicate()
183 self.stdout.write(out)
184 self.stderr.write(err)
185 self.exitstatus = self.returncode = self.subproc.returncode
192 if self.subproc.pid and not self.finished:
193 log.info("kill: killing pid {0} ({1})".format(
194 self.subproc.pid, self.args))
195 safe_kill(self.subproc.pid)
197 log.info("kill: already terminated ({0})".format(self.args))
201 class FakeStdIn(object):
202 def __init__(self, mount_daemon):
203 self.mount_daemon = mount_daemon
206 self.mount_daemon.kill()
208 return FakeStdIn(self)
211 class LocalRemote(object):
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
216 Run this inside your src/ dir!
221 self.hostname = "localhost"
222 self.user = getpass.getuser()
224 def get_file(self, path, sudo, dest_dir):
225 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226 shutil.copy(path, tmpfile)
229 def put_file(self, src, dst, sudo=False):
230 shutil.copy(src, dst)
232 def run(self, args, check_status=True, wait=True,
233 stdout=None, stderr=None, cwd=None, stdin=None,
234 logger=None, label=None, env=None):
235 log.info("run args={0}".format(args))
237 # We don't need no stinkin' sudo
238 args = [a for a in args if a != "sudo"]
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell = any([a for a in args if isinstance(a, Raw)])
247 if args[i] == 'adjust-ulimits':
249 elif args[i] == 'ceph-coverage':
251 elif args[i] == 'timeout':
254 filtered.append(args[i])
257 args = quote(filtered)
258 log.info("Running {0}".format(args))
260 subproc = subprocess.Popen(args,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 stdin=subprocess.PIPE,
267 log.info("Running {0}".format(args))
270 if not isinstance(arg, basestring):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
275 subproc = subprocess.Popen(args,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE,
278 stdin=subprocess.PIPE,
283 if not isinstance(stdin, basestring):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc.stdin.write(stdin)
290 proc = LocalRemoteProcess(
291 args, subproc, check_status,
301 class LocalDaemon(object):
302 def __init__(self, daemon_type, daemon_id):
303 self.daemon_type = daemon_type
304 self.daemon_id = daemon_id
305 self.controller = LocalRemote()
313 return self._get_pid() is not None
317 Return PID as an integer or None if not found
319 ps_txt = self.controller.run(
320 args=["ps", "ww", "-u"+str(os.getuid())]
321 ).stdout.getvalue().strip()
322 lines = ps_txt.split("\n")[1:]
325 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326 log.info("Found ps line for daemon: {0}".format(line))
327 return int(line.split()[0])
328 log.info("No match for {0} {1}: {2}".format(
329 self.daemon_type, self.daemon_id, ps_txt
333 def wait(self, timeout):
335 while self._get_pid() is not None:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
341 def stop(self, timeout=300):
342 if not self.running():
343 log.error('tried to stop a non-running daemon')
346 pid = self._get_pid()
347 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348 os.kill(pid, signal.SIGKILL)
351 while pid is not None:
352 new_pid = self._get_pid()
353 if new_pid is not None and new_pid != pid:
354 log.info("Killing new PID {0}".format(new_pid))
356 os.kill(pid, signal.SIGKILL)
363 "Timed out waiting for daemon {0}.{1}".format(
364 self.daemon_type, self.daemon_id))
368 self.wait(timeout=timeout)
371 if self._get_pid() is not None:
374 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
379 os.kill annoyingly raises exception if process already dead. Ignore it.
382 return os.kill(pid, signal.SIGKILL)
384 if e.errno == errno.ESRCH:
385 # Raced with process termination
391 class LocalFuseMount(FuseMount):
392 def __init__(self, test_dir, client_id):
393 super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
396 def config_path(self):
399 def get_keyring_path(self):
400 # This is going to end up in a config file, so use an absolute path
401 # to avoid assumptions about daemons' pwd
402 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
404 def run_shell(self, args, wait=True):
405 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
406 # the "cd foo && bar" shenanigans isn't needed to begin with and
407 # then we wouldn't have to special case this
408 return self.client_remote.run(
409 args, wait=wait, cwd=self.mountpoint
416 def _asok_path(self):
417 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
418 # run foreground. When running it daemonized however, the asok is named after
419 # the PID of the launching process, not the long running ceph-fuse process. Therefore
420 # we need to give an exact path here as the logic for checking /proc/ for which
421 # asok is alive does not work.
422 path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
423 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
427 if self.is_mounted():
428 super(LocalFuseMount, self).umount()
430 def mount(self, mount_path=None, mount_fs_name=None):
431 self.client_remote.run(
439 def list_connections():
440 self.client_remote.run(
441 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
444 p = self.client_remote.run(
445 args=["ls", "/sys/fs/fuse/connections"],
448 if p.exitstatus != 0:
449 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
452 ls_str = p.stdout.getvalue().strip()
454 return [int(n) for n in ls_str.split("\n")]
458 # Before starting ceph-fuse process, note the contents of
459 # /sys/fs/fuse/connections
460 pre_mount_conns = list_connections()
461 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
463 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
465 prefix += ["--client-die-on-failed-remount=false"]
467 if mount_path is not None:
468 prefix += ["--client_mountpoint={0}".format(mount_path)]
470 if mount_fs_name is not None:
471 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
473 self.fuse_daemon = self.client_remote.run(args=
477 "client.{0}".format(self.client_id),
481 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
483 # Wait for the connection reference to appear in /sys
485 post_mount_conns = list_connections()
486 while len(post_mount_conns) <= len(pre_mount_conns):
487 if self.fuse_daemon.finished:
488 # Did mount fail? Raise the CommandFailedError instead of
489 # hitting the "failed to populate /sys/" timeout
490 self.fuse_daemon.wait()
494 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
497 post_mount_conns = list_connections()
499 log.info("Post-mount connections: {0}".format(post_mount_conns))
501 # Record our fuse connection number so that we can use it when
503 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
504 if len(new_conns) == 0:
505 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
506 elif len(new_conns) > 1:
507 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
509 self._fuse_conn = new_conns[0]
511 def _run_python(self, pyscript):
513 Override this to remove the daemon-helper prefix that is used otherwise
514 to make the process killable.
516 return self.client_remote.run(args=[
517 'python', '-c', pyscript
521 class LocalCephManager(CephManager):
523 # Deliberately skip parent init, only inheriting from it to get
524 # util methods like osd_dump that sit on top of raw_cluster_cmd
525 self.controller = LocalRemote()
527 # A minority of CephManager fns actually bother locking for when
528 # certain teuthology tests want to run tasks in parallel
529 self.lock = threading.RLock()
531 self.log = lambda x: log.info(x)
533 def find_remote(self, daemon_type, daemon_id):
535 daemon_type like 'mds', 'osd'
536 daemon_id like 'a', '0'
540 def run_ceph_w(self):
541 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
544 def raw_cluster_cmd(self, *args):
546 args like ["osd", "dump"}
549 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
550 return proc.stdout.getvalue()
552 def raw_cluster_cmd_result(self, *args):
554 like raw_cluster_cmd but don't check status, just return rc
556 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
557 return proc.exitstatus
559 def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
560 return self.controller.run(
561 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
565 def get_mds_status(self, mds):
567 Run cluster commands for the mds in order to get mds information
569 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
570 j = json.loads(' '.join(out.splitlines()[1:]))
571 # collate; for dup ids, larger gid wins.
572 for info in j['info'].itervalues():
573 if info['name'] == mds:
578 def get_mds_status_by_rank(self, rank):
580 Run cluster commands for the mds in order to get mds information
583 j = self.get_mds_status_all()
584 # collate; for dup ids, larger gid wins.
585 for info in j['info'].itervalues():
586 if info['rank'] == rank:
590 def get_mds_status_all(self):
592 Run cluster command to extract all the mds status.
594 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
595 j = json.loads(' '.join(out.splitlines()[1:]))
599 class LocalCephCluster(CephCluster):
600 def __init__(self, ctx):
601 # Deliberately skip calling parent constructor
603 self.mon_manager = LocalCephManager()
604 self._conf = defaultdict(dict)
607 def admin_remote(self):
610 def get_config(self, key, service_type=None):
611 if service_type is None:
614 # FIXME hardcoded vstart service IDs
621 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
623 def _write_conf(self):
624 # In teuthology, we have the honour of writing the entire ceph.conf, but
625 # in vstart land it has mostly already been written and we need to carefully
627 conf_path = "./ceph.conf"
628 banner = "\n#LOCAL_TEST\n"
629 existing_str = open(conf_path).read()
631 if banner in existing_str:
632 existing_str = existing_str[0:existing_str.find(banner)]
634 existing_str += banner
636 for subsys, kvs in self._conf.items():
637 existing_str += "\n[{0}]\n".format(subsys)
638 for key, val in kvs.items():
639 # Comment out existing instance if it exists
640 log.info("Searching for existing instance {0}/{1}".format(
643 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
645 ), existing_str, re.MULTILINE)
648 section_str = existing_str[existing_section.start():existing_section.end()]
649 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
651 start = existing_section.start() + existing_val.start(1)
652 log.info("Found string to replace at {0}".format(
655 existing_str = existing_str[0:start] + "#" + existing_str[start:]
657 existing_str += "{0} = {1}\n".format(key, val)
659 open(conf_path, "w").write(existing_str)
661 def set_ceph_conf(self, subsys, key, value):
662 self._conf[subsys][key] = value
665 def clear_ceph_conf(self, subsys, key):
666 del self._conf[subsys][key]
670 class LocalMDSCluster(LocalCephCluster, MDSCluster):
671 def __init__(self, ctx):
672 super(LocalMDSCluster, self).__init__(ctx)
674 self.mds_ids = ctx.daemons.daemons['mds'].keys()
675 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
677 def clear_firewall(self):
678 # FIXME: unimplemented
681 def newfs(self, name='cephfs', create=True):
682 return LocalFilesystem(self._ctx, name=name, create=create)
685 class LocalMgrCluster(LocalCephCluster, MgrCluster):
686 def __init__(self, ctx):
687 super(LocalMgrCluster, self).__init__(ctx)
689 self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
690 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
693 class LocalFilesystem(Filesystem, LocalMDSCluster):
694 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
695 # Deliberately skip calling parent constructor
700 self.metadata_pool_name = None
701 self.metadata_overlay = False
702 self.data_pool_name = None
703 self.data_pools = None
705 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
707 for line in open("ceph.conf").readlines():
708 match = re.match("^\[mds\.(.+)\]$", line)
710 self.mds_ids.add(match.group(1))
713 raise RuntimeError("No MDSs found in ceph.conf!")
715 self.mds_ids = list(self.mds_ids)
717 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
719 self.mon_manager = LocalCephManager()
721 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
723 self.client_remote = LocalRemote()
725 self._conf = defaultdict(dict)
728 if fscid is not None:
729 raise RuntimeError("cannot specify fscid when creating fs")
730 if create and not self.legacy_configured():
733 if fscid is not None:
735 self.getinfo(refresh=True)
737 # Stash a reference to the first created filesystem on ctx, so
738 # that if someone drops to the interactive shell they can easily
740 if not hasattr(self._ctx, "filesystem"):
741 self._ctx.filesystem = self
747 def set_clients_block(self, blocked, mds_id=None):
748 raise NotImplementedError()
750 def get_pgs_per_fs_pool(self):
751 # FIXME: assuming there are 3 OSDs
752 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
755 class InteractiveFailureResult(unittest.TextTestResult):
757 Specialization that implements interactive-on-error style
760 def addFailure(self, test, err):
761 super(InteractiveFailureResult, self).addFailure(test, err)
762 log.error(self._exc_info_to_string(err, test))
763 log.error("Failure in test '{0}', going interactive".format(
764 self.getDescription(test)
766 interactive.task(ctx=None, config=None)
768 def addError(self, test, err):
769 super(InteractiveFailureResult, self).addError(test, err)
770 log.error(self._exc_info_to_string(err, test))
771 log.error("Error in test '{0}', going interactive".format(
772 self.getDescription(test)
774 interactive.task(ctx=None, config=None)
777 def enumerate_methods(s):
778 log.info("e: {0}".format(s))
780 if isinstance(t, suite.BaseTestSuite):
781 for sub in enumerate_methods(t):
787 def load_tests(modules, loader):
789 log.info("Executing modules: {0}".format(modules))
791 for mod_name in modules:
792 # Test names like cephfs.test_auto_repair
793 module_suites.append(loader.loadTestsFromName(mod_name))
794 log.info("Loaded: {0}".format(list(module_suites)))
795 return suite.TestSuite(module_suites)
797 log.info("Executing all cephfs tests")
798 return loader.discover(
799 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
803 def scan_tests(modules):
804 overall_suite = load_tests(modules, loader.TestLoader())
807 max_required_clients = 0
810 for suite, case in enumerate_methods(overall_suite):
811 max_required_mds = max(max_required_mds,
812 getattr(case, "MDSS_REQUIRED", 0))
813 max_required_clients = max(max_required_clients,
814 getattr(case, "CLIENTS_REQUIRED", 0))
815 max_required_mgr = max(max_required_mgr,
816 getattr(case, "MGRS_REQUIRED", 0))
818 return max_required_mds, max_required_clients, max_required_mgr
821 class LocalCluster(object):
822 def __init__(self, rolename="placeholder"):
824 LocalRemote(): [rolename]
827 def only(self, requested):
828 return self.__class__(rolename=requested)
831 class LocalContext(object):
834 self.teuthology_config = teuth_config
835 self.cluster = LocalCluster()
836 self.daemons = DaemonGroup()
838 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
839 # tests that want to look these up via ctx can do so.
840 # Inspect ceph.conf to see what roles exist
841 for conf_line in open("ceph.conf").readlines():
842 for svc_type in ["mon", "osd", "mds", "mgr"]:
843 if svc_type not in self.daemons.daemons:
844 self.daemons.daemons[svc_type] = {}
845 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
847 svc_id = match.group(1)
848 self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
851 shutil.rmtree(self.teuthology_config['test_path'])
856 interactive_on_error = False
857 create_cluster = False
860 flags = [a for a in args if a.startswith("-")]
861 modules = [a for a in args if not a.startswith("-")]
863 if f == "--interactive":
864 interactive_on_error = True
865 elif f == "--create":
866 create_cluster = True
868 log.error("Unknown option '{0}'".format(f))
871 # Help developers by stopping up-front if their tree isn't built enough for all the
872 # tools that the tests might want to use (add more here if needed)
873 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
874 "cephfs-table-tool", "ceph-fuse", "rados"]
875 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
877 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
880 max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
882 remote = LocalRemote()
884 # Tolerate no MDSs or clients running at start
886 args=["ps", "-u"+str(os.getuid())]
887 ).stdout.getvalue().strip()
888 lines = ps_txt.split("\n")[1:]
890 if 'ceph-fuse' in line or 'ceph-mds' in line:
891 pid = int(line.split()[0])
892 log.warn("Killing stray process {0}".format(line))
893 os.kill(pid, signal.SIGKILL)
895 # Fire up the Ceph cluster if the user requested it
897 log.info("Creating cluster with {0} MDS daemons".format(
899 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
900 remote.run(["rm", "-rf", "./out"])
901 remote.run(["rm", "-rf", "./dev"])
902 vstart_env = os.environ.copy()
903 vstart_env["FS"] = "0"
904 vstart_env["MDS"] = max_required_mds.__str__()
905 vstart_env["OSD"] = "1"
906 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
908 remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
911 # Wait for OSD to come up so that subsequent injectargs etc will
913 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
915 # List of client mounts, sufficient to run the selected tests
916 clients = [i.__str__() for i in range(0, max_required_clients)]
918 test_dir = tempfile.mkdtemp()
919 teuth_config['test_path'] = test_dir
921 # Construct Mount classes
923 for client_id in clients:
924 # Populate client keyring (it sucks to use client.admin for test clients
925 # because it's awkward to find the logs later)
926 client_name = "client.{0}".format(client_id)
928 if client_name not in open("./keyring").read():
929 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
934 open("./keyring", "a").write(p.stdout.getvalue())
936 mount = LocalFuseMount(test_dir, client_id)
938 if mount.is_mounted():
939 log.warn("unmounting {0}".format(mount.mountpoint))
942 if os.path.exists(mount.mountpoint):
943 os.rmdir(mount.mountpoint)
946 ceph_cluster = LocalCephCluster(ctx)
947 mds_cluster = LocalMDSCluster(ctx)
948 mgr_cluster = LocalMgrCluster(ctx)
950 from tasks.cephfs_test_runner import DecoratingLoader
952 class LogStream(object):
956 def write(self, data):
958 if "\n" in self.buffer:
959 lines = self.buffer.split("\n")
960 for line in lines[:-1]:
962 # sys.stderr.write(line + "\n")
964 self.buffer = lines[-1]
969 decorating_loader = DecoratingLoader({
972 "ceph_cluster": ceph_cluster,
973 "mds_cluster": mds_cluster,
974 "mgr_cluster": mgr_cluster,
977 # For the benefit of polling tests like test_full -- in teuthology land we set this
978 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
979 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
980 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
982 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
983 # from normal IO latency. Increase it for running teests.
984 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
986 # Make sure the filesystem created in tests has uid/gid that will let us talk to
987 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
988 # so that cephfs-data-scan will pick it up too.
989 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
990 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
992 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
993 def _get_package_version(remote, pkg_name):
994 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
997 import teuthology.packaging
998 teuthology.packaging.get_package_version = _get_package_version
1000 overall_suite = load_tests(modules, decorating_loader)
1002 # Filter out tests that don't lend themselves to interactive running,
1004 for case, method in enumerate_methods(overall_suite):
1005 fn = getattr(method, method._testMethodName)
1009 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1011 log.warn("Dropping test because long running: ".format(method.id()))
1013 if getattr(fn, "needs_trimming", False) is True:
1014 drop_test = (os.getuid() != 0)
1015 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1018 # Don't drop the test if it was explicitly requested in arguments
1020 for named in modules:
1021 if named.endswith(method.id()):
1026 victims.append((case, method))
1028 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1029 for s, method in victims:
1030 s._tests.remove(method)
1032 if interactive_on_error:
1033 result_class = InteractiveFailureResult
1035 result_class = unittest.TextTestResult
1036 fail_on_skip = False
1038 class LoggingResult(result_class):
1039 def startTest(self, test):
1040 log.info("Starting test: {0}".format(self.getDescription(test)))
1041 test.started_at = datetime.datetime.utcnow()
1042 return super(LoggingResult, self).startTest(test)
1044 def stopTest(self, test):
1045 log.info("Stopped test: {0} in {1}s".format(
1046 self.getDescription(test),
1047 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1050 def addSkip(self, test, reason):
1052 # Don't just call addFailure because that requires a traceback
1053 self.failures.append((test, reason))
1055 super(LoggingResult, self).addSkip(test, reason)
1058 result = unittest.TextTestRunner(
1060 resultclass=LoggingResult,
1062 failfast=True).run(overall_suite)
1064 if not result.wasSuccessful():
1065 result.printErrors() # duplicate output at end for convenience
1068 for test, error in result.errors:
1069 bad_tests.append(str(test))
1070 for test, failure in result.failures:
1071 bad_tests.append(str(test))
1078 if __name__ == "__main__":