X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fqa%2Ftasks%2Fvstart_runner.py;fp=src%2Fceph%2Fqa%2Ftasks%2Fvstart_runner.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=842e80d4d4e857c9421a50e7e28548e5c1d720ea;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/qa/tasks/vstart_runner.py b/src/ceph/qa/tasks/vstart_runner.py deleted file mode 100644 index 842e80d..0000000 --- a/src/ceph/qa/tasks/vstart_runner.py +++ /dev/null @@ -1,1079 +0,0 @@ -""" -vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart -ceph instance instead of a packaged/installed cluster. Use this to turn around test cases -quickly during development. - -Simple usage (assuming teuthology and ceph checked out in ~/git): - - # Activate the teuthology virtualenv - source ~/git/teuthology/virtualenv/bin/activate - # Go into your ceph build directory - cd ~/git/ceph/build - # Invoke a test using this script - python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan - -Alternative usage: - - # Alternatively, if you use different paths, specify them as follows: - LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py - - # If you wish to drop to a python shell on failures, use --interactive: - python ~/git/ceph/qa/tasks/vstart_runner.py --interactive - - # If you wish to run a named test case, pass it as an argument: - python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan - -""" - -from StringIO import StringIO -from collections import defaultdict -import getpass -import signal -import tempfile -import threading -import datetime -import shutil -import re -import os -import time -import json -import sys -import errno -from unittest import suite, loader -import unittest -import platform -from teuthology.orchestra.run import Raw, quote -from teuthology.orchestra.daemon import DaemonGroup -from teuthology.config import config as teuth_config - -import logging - -log = logging.getLogger(__name__) - -handler = logging.FileHandler("./vstart_runner.log") -formatter = logging.Formatter( - fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s', - datefmt='%Y-%m-%dT%H:%M:%S') -handler.setFormatter(formatter) -log.addHandler(handler) -log.setLevel(logging.INFO) - - -def respawn_in_path(lib_path, python_paths): - execv_cmd = ['python'] - if platform.system() == "Darwin": - lib_path_var = "DYLD_LIBRARY_PATH" - else: - lib_path_var = "LD_LIBRARY_PATH" - - py_binary = os.environ.get("PYTHON", "python") - - if lib_path_var in os.environ: - if lib_path not in os.environ[lib_path_var]: - os.environ[lib_path_var] += ':' + lib_path - os.execvp(py_binary, execv_cmd + sys.argv) - else: - os.environ[lib_path_var] = lib_path - os.execvp(py_binary, execv_cmd + sys.argv) - - for p in python_paths: - sys.path.insert(0, p) - - -# Let's use some sensible defaults -if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"): - - # A list of candidate paths for each package we need - guesses = [ - ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"], - ["lib/cython_modules/lib.2"], - ["../src/pybind"], - ] - - python_paths = [] - - # Up one level so that "tasks.foo.bar" imports work - python_paths.append(os.path.abspath( - os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") - )) - - for package_guesses in guesses: - for g in package_guesses: - g_exp = os.path.abspath(os.path.expanduser(g)) - if os.path.exists(g_exp): - python_paths.append(g_exp) - - ld_path = os.path.join(os.getcwd(), "lib/") - print "Using guessed paths {0} {1}".format(ld_path, python_paths) - respawn_in_path(ld_path, python_paths) - - -try: - from teuthology.exceptions import CommandFailedError - from tasks.ceph_manager import CephManager - from tasks.cephfs.fuse_mount import FuseMount - from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster - from mgr.mgr_test_case import MgrCluster - from teuthology.contextutil import MaxWhileTries - from teuthology.task import interactive -except ImportError: - sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv " - "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n") - raise - -# Must import after teuthology because of gevent monkey patching -import subprocess - -if os.path.exists("./CMakeCache.txt"): - # Running in build dir of a cmake build - BIN_PREFIX = "./bin/" - SRC_PREFIX = "../src" -else: - # Running in src/ of an autotools build - BIN_PREFIX = "./" - SRC_PREFIX = "./" - - -class LocalRemoteProcess(object): - def __init__(self, args, subproc, check_status, stdout, stderr): - self.args = args - self.subproc = subproc - if stdout is None: - self.stdout = StringIO() - else: - self.stdout = stdout - - if stderr is None: - self.stderr = StringIO() - else: - self.stderr = stderr - - self.check_status = check_status - self.exitstatus = self.returncode = None - - def wait(self): - if self.finished: - # Avoid calling communicate() on a dead process because it'll - # give you stick about std* already being closed - if self.exitstatus != 0: - raise CommandFailedError(self.args, self.exitstatus) - else: - return - - out, err = self.subproc.communicate() - self.stdout.write(out) - self.stderr.write(err) - - self.exitstatus = self.returncode = self.subproc.returncode - - if self.exitstatus != 0: - sys.stderr.write(out) - sys.stderr.write(err) - - if self.check_status and self.exitstatus != 0: - raise CommandFailedError(self.args, self.exitstatus) - - @property - def finished(self): - if self.exitstatus is not None: - return True - - if self.subproc.poll() is not None: - out, err = self.subproc.communicate() - self.stdout.write(out) - self.stderr.write(err) - self.exitstatus = self.returncode = self.subproc.returncode - return True - else: - return False - - def kill(self): - log.info("kill ") - if self.subproc.pid and not self.finished: - log.info("kill: killing pid {0} ({1})".format( - self.subproc.pid, self.args)) - safe_kill(self.subproc.pid) - else: - log.info("kill: already terminated ({0})".format(self.args)) - - @property - def stdin(self): - class FakeStdIn(object): - def __init__(self, mount_daemon): - self.mount_daemon = mount_daemon - - def close(self): - self.mount_daemon.kill() - - return FakeStdIn(self) - - -class LocalRemote(object): - """ - Amusingly named class to present the teuthology RemoteProcess interface when we are really - running things locally for vstart - - Run this inside your src/ dir! - """ - - def __init__(self): - self.name = "local" - self.hostname = "localhost" - self.user = getpass.getuser() - - def get_file(self, path, sudo, dest_dir): - tmpfile = tempfile.NamedTemporaryFile(delete=False).name - shutil.copy(path, tmpfile) - return tmpfile - - def put_file(self, src, dst, sudo=False): - shutil.copy(src, dst) - - def run(self, args, check_status=True, wait=True, - stdout=None, stderr=None, cwd=None, stdin=None, - logger=None, label=None, env=None): - log.info("run args={0}".format(args)) - - # We don't need no stinkin' sudo - args = [a for a in args if a != "sudo"] - - # We have to use shell=True if any run.Raw was present, e.g. && - shell = any([a for a in args if isinstance(a, Raw)]) - - if shell: - filtered = [] - i = 0 - while i < len(args): - if args[i] == 'adjust-ulimits': - i += 1 - elif args[i] == 'ceph-coverage': - i += 2 - elif args[i] == 'timeout': - i += 2 - else: - filtered.append(args[i]) - i += 1 - - args = quote(filtered) - log.info("Running {0}".format(args)) - - subproc = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=cwd, - shell=True) - else: - log.info("Running {0}".format(args)) - - for arg in args: - if not isinstance(arg, basestring): - raise RuntimeError("Oops, can't handle arg {0} type {1}".format( - arg, arg.__class__ - )) - - subproc = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=cwd, - env=env) - - if stdin: - if not isinstance(stdin, basestring): - raise RuntimeError("Can't handle non-string stdins on a vstart cluster") - - # Hack: writing to stdin is not deadlock-safe, but it "always" works - # as long as the input buffer is "small" - subproc.stdin.write(stdin) - - proc = LocalRemoteProcess( - args, subproc, check_status, - stdout, stderr - ) - - if wait: - proc.wait() - - return proc - - -class LocalDaemon(object): - def __init__(self, daemon_type, daemon_id): - self.daemon_type = daemon_type - self.daemon_id = daemon_id - self.controller = LocalRemote() - self.proc = None - - @property - def remote(self): - return LocalRemote() - - def running(self): - return self._get_pid() is not None - - def _get_pid(self): - """ - Return PID as an integer or None if not found - """ - ps_txt = self.controller.run( - args=["ps", "ww", "-u"+str(os.getuid())] - ).stdout.getvalue().strip() - lines = ps_txt.split("\n")[1:] - - for line in lines: - if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1: - log.info("Found ps line for daemon: {0}".format(line)) - return int(line.split()[0]) - log.info("No match for {0} {1}: {2}".format( - self.daemon_type, self.daemon_id, ps_txt - )) - return None - - def wait(self, timeout): - waited = 0 - while self._get_pid() is not None: - if waited > timeout: - raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id)) - time.sleep(1) - waited += 1 - - def stop(self, timeout=300): - if not self.running(): - log.error('tried to stop a non-running daemon') - return - - pid = self._get_pid() - log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id)) - os.kill(pid, signal.SIGKILL) - - waited = 0 - while pid is not None: - new_pid = self._get_pid() - if new_pid is not None and new_pid != pid: - log.info("Killing new PID {0}".format(new_pid)) - pid = new_pid - os.kill(pid, signal.SIGKILL) - - if new_pid is None: - break - else: - if waited > timeout: - raise MaxWhileTries( - "Timed out waiting for daemon {0}.{1}".format( - self.daemon_type, self.daemon_id)) - time.sleep(1) - waited += 1 - - self.wait(timeout=timeout) - - def restart(self): - if self._get_pid() is not None: - self.stop() - - self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id]) - - -def safe_kill(pid): - """ - os.kill annoyingly raises exception if process already dead. Ignore it. - """ - try: - return os.kill(pid, signal.SIGKILL) - except OSError as e: - if e.errno == errno.ESRCH: - # Raced with process termination - pass - else: - raise - - -class LocalFuseMount(FuseMount): - def __init__(self, test_dir, client_id): - super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote()) - - @property - def config_path(self): - return "./ceph.conf" - - def get_keyring_path(self): - # This is going to end up in a config file, so use an absolute path - # to avoid assumptions about daemons' pwd - return os.path.abspath("./client.{0}.keyring".format(self.client_id)) - - def run_shell(self, args, wait=True): - # FIXME maybe should add a pwd arg to teuthology.orchestra so that - # the "cd foo && bar" shenanigans isn't needed to begin with and - # then we wouldn't have to special case this - return self.client_remote.run( - args, wait=wait, cwd=self.mountpoint - ) - - @property - def _prefix(self): - return BIN_PREFIX - - def _asok_path(self): - # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's - # run foreground. When running it daemonized however, the asok is named after - # the PID of the launching process, not the long running ceph-fuse process. Therefore - # we need to give an exact path here as the logic for checking /proc/ for which - # asok is alive does not work. - path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid) - log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid)) - return path - - def umount(self): - if self.is_mounted(): - super(LocalFuseMount, self).umount() - - def mount(self, mount_path=None, mount_fs_name=None): - self.client_remote.run( - args=[ - 'mkdir', - '--', - self.mountpoint, - ], - ) - - def list_connections(): - self.client_remote.run( - args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], - check_status=False - ) - p = self.client_remote.run( - args=["ls", "/sys/fs/fuse/connections"], - check_status=False - ) - if p.exitstatus != 0: - log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus)) - return [] - - ls_str = p.stdout.getvalue().strip() - if ls_str: - return [int(n) for n in ls_str.split("\n")] - else: - return [] - - # Before starting ceph-fuse process, note the contents of - # /sys/fs/fuse/connections - pre_mount_conns = list_connections() - log.info("Pre-mount connections: {0}".format(pre_mount_conns)) - - prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")] - if os.getuid() != 0: - prefix += ["--client-die-on-failed-remount=false"] - - if mount_path is not None: - prefix += ["--client_mountpoint={0}".format(mount_path)] - - if mount_fs_name is not None: - prefix += ["--client_mds_namespace={0}".format(mount_fs_name)] - - self.fuse_daemon = self.client_remote.run(args= - prefix + [ - "-f", - "--name", - "client.{0}".format(self.client_id), - self.mountpoint - ], wait=False) - - log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid)) - - # Wait for the connection reference to appear in /sys - waited = 0 - post_mount_conns = list_connections() - while len(post_mount_conns) <= len(pre_mount_conns): - if self.fuse_daemon.finished: - # Did mount fail? Raise the CommandFailedError instead of - # hitting the "failed to populate /sys/" timeout - self.fuse_daemon.wait() - time.sleep(1) - waited += 1 - if waited > 30: - raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( - waited - )) - post_mount_conns = list_connections() - - log.info("Post-mount connections: {0}".format(post_mount_conns)) - - # Record our fuse connection number so that we can use it when - # forcing an unmount - new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) - if len(new_conns) == 0: - raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) - elif len(new_conns) > 1: - raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) - else: - self._fuse_conn = new_conns[0] - - def _run_python(self, pyscript): - """ - Override this to remove the daemon-helper prefix that is used otherwise - to make the process killable. - """ - return self.client_remote.run(args=[ - 'python', '-c', pyscript - ], wait=False) - - -class LocalCephManager(CephManager): - def __init__(self): - # Deliberately skip parent init, only inheriting from it to get - # util methods like osd_dump that sit on top of raw_cluster_cmd - self.controller = LocalRemote() - - # A minority of CephManager fns actually bother locking for when - # certain teuthology tests want to run tasks in parallel - self.lock = threading.RLock() - - self.log = lambda x: log.info(x) - - def find_remote(self, daemon_type, daemon_id): - """ - daemon_type like 'mds', 'osd' - daemon_id like 'a', '0' - """ - return LocalRemote() - - def run_ceph_w(self): - proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO()) - return proc - - def raw_cluster_cmd(self, *args): - """ - args like ["osd", "dump"} - return stdout string - """ - proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args)) - return proc.stdout.getvalue() - - def raw_cluster_cmd_result(self, *args): - """ - like raw_cluster_cmd but don't check status, just return rc - """ - proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False) - return proc.exitstatus - - def admin_socket(self, daemon_type, daemon_id, command, check_status=True): - return self.controller.run( - args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status - ) - - # FIXME: copypasta - def get_mds_status(self, mds): - """ - Run cluster commands for the mds in order to get mds information - """ - out = self.raw_cluster_cmd('mds', 'dump', '--format=json') - j = json.loads(' '.join(out.splitlines()[1:])) - # collate; for dup ids, larger gid wins. - for info in j['info'].itervalues(): - if info['name'] == mds: - return info - return None - - # FIXME: copypasta - def get_mds_status_by_rank(self, rank): - """ - Run cluster commands for the mds in order to get mds information - check rank. - """ - j = self.get_mds_status_all() - # collate; for dup ids, larger gid wins. - for info in j['info'].itervalues(): - if info['rank'] == rank: - return info - return None - - def get_mds_status_all(self): - """ - Run cluster command to extract all the mds status. - """ - out = self.raw_cluster_cmd('mds', 'dump', '--format=json') - j = json.loads(' '.join(out.splitlines()[1:])) - return j - - -class LocalCephCluster(CephCluster): - def __init__(self, ctx): - # Deliberately skip calling parent constructor - self._ctx = ctx - self.mon_manager = LocalCephManager() - self._conf = defaultdict(dict) - - @property - def admin_remote(self): - return LocalRemote() - - def get_config(self, key, service_type=None): - if service_type is None: - service_type = 'mon' - - # FIXME hardcoded vstart service IDs - service_id = { - 'mon': 'a', - 'mds': 'a', - 'osd': '0' - }[service_type] - - return self.json_asok(['config', 'get', key], service_type, service_id)[key] - - def _write_conf(self): - # In teuthology, we have the honour of writing the entire ceph.conf, but - # in vstart land it has mostly already been written and we need to carefully - # append to it. - conf_path = "./ceph.conf" - banner = "\n#LOCAL_TEST\n" - existing_str = open(conf_path).read() - - if banner in existing_str: - existing_str = existing_str[0:existing_str.find(banner)] - - existing_str += banner - - for subsys, kvs in self._conf.items(): - existing_str += "\n[{0}]\n".format(subsys) - for key, val in kvs.items(): - # Comment out existing instance if it exists - log.info("Searching for existing instance {0}/{1}".format( - key, subsys - )) - existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format( - subsys - ), existing_str, re.MULTILINE) - - if existing_section: - section_str = existing_str[existing_section.start():existing_section.end()] - existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE) - if existing_val: - start = existing_section.start() + existing_val.start(1) - log.info("Found string to replace at {0}".format( - start - )) - existing_str = existing_str[0:start] + "#" + existing_str[start:] - - existing_str += "{0} = {1}\n".format(key, val) - - open(conf_path, "w").write(existing_str) - - def set_ceph_conf(self, subsys, key, value): - self._conf[subsys][key] = value - self._write_conf() - - def clear_ceph_conf(self, subsys, key): - del self._conf[subsys][key] - self._write_conf() - - -class LocalMDSCluster(LocalCephCluster, MDSCluster): - def __init__(self, ctx): - super(LocalMDSCluster, self).__init__(ctx) - - self.mds_ids = ctx.daemons.daemons['mds'].keys() - self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) - - def clear_firewall(self): - # FIXME: unimplemented - pass - - def newfs(self, name='cephfs', create=True): - return LocalFilesystem(self._ctx, name=name, create=create) - - -class LocalMgrCluster(LocalCephCluster, MgrCluster): - def __init__(self, ctx): - super(LocalMgrCluster, self).__init__(ctx) - - self.mgr_ids = ctx.daemons.daemons['mgr'].keys() - self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids]) - - -class LocalFilesystem(Filesystem, LocalMDSCluster): - def __init__(self, ctx, fscid=None, name='cephfs', create=False): - # Deliberately skip calling parent constructor - self._ctx = ctx - - self.id = None - self.name = None - self.metadata_pool_name = None - self.metadata_overlay = False - self.data_pool_name = None - self.data_pools = None - - # Hack: cheeky inspection of ceph.conf to see what MDSs exist - self.mds_ids = set() - for line in open("ceph.conf").readlines(): - match = re.match("^\[mds\.(.+)\]$", line) - if match: - self.mds_ids.add(match.group(1)) - - if not self.mds_ids: - raise RuntimeError("No MDSs found in ceph.conf!") - - self.mds_ids = list(self.mds_ids) - - log.info("Discovered MDS IDs: {0}".format(self.mds_ids)) - - self.mon_manager = LocalCephManager() - - self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) - - self.client_remote = LocalRemote() - - self._conf = defaultdict(dict) - - if name is not None: - if fscid is not None: - raise RuntimeError("cannot specify fscid when creating fs") - if create and not self.legacy_configured(): - self.create() - else: - if fscid is not None: - self.id = fscid - self.getinfo(refresh=True) - - # Stash a reference to the first created filesystem on ctx, so - # that if someone drops to the interactive shell they can easily - # poke our methods. - if not hasattr(self._ctx, "filesystem"): - self._ctx.filesystem = self - - @property - def _prefix(self): - return BIN_PREFIX - - def set_clients_block(self, blocked, mds_id=None): - raise NotImplementedError() - - def get_pgs_per_fs_pool(self): - # FIXME: assuming there are 3 OSDs - return 3 * int(self.get_config('mon_pg_warn_min_per_osd')) - - -class InteractiveFailureResult(unittest.TextTestResult): - """ - Specialization that implements interactive-on-error style - behavior. - """ - def addFailure(self, test, err): - super(InteractiveFailureResult, self).addFailure(test, err) - log.error(self._exc_info_to_string(err, test)) - log.error("Failure in test '{0}', going interactive".format( - self.getDescription(test) - )) - interactive.task(ctx=None, config=None) - - def addError(self, test, err): - super(InteractiveFailureResult, self).addError(test, err) - log.error(self._exc_info_to_string(err, test)) - log.error("Error in test '{0}', going interactive".format( - self.getDescription(test) - )) - interactive.task(ctx=None, config=None) - - -def enumerate_methods(s): - log.info("e: {0}".format(s)) - for t in s._tests: - if isinstance(t, suite.BaseTestSuite): - for sub in enumerate_methods(t): - yield sub - else: - yield s, t - - -def load_tests(modules, loader): - if modules: - log.info("Executing modules: {0}".format(modules)) - module_suites = [] - for mod_name in modules: - # Test names like cephfs.test_auto_repair - module_suites.append(loader.loadTestsFromName(mod_name)) - log.info("Loaded: {0}".format(list(module_suites))) - return suite.TestSuite(module_suites) - else: - log.info("Executing all cephfs tests") - return loader.discover( - os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs") - ) - - -def scan_tests(modules): - overall_suite = load_tests(modules, loader.TestLoader()) - - max_required_mds = 0 - max_required_clients = 0 - max_required_mgr = 0 - - for suite, case in enumerate_methods(overall_suite): - max_required_mds = max(max_required_mds, - getattr(case, "MDSS_REQUIRED", 0)) - max_required_clients = max(max_required_clients, - getattr(case, "CLIENTS_REQUIRED", 0)) - max_required_mgr = max(max_required_mgr, - getattr(case, "MGRS_REQUIRED", 0)) - - return max_required_mds, max_required_clients, max_required_mgr - - -class LocalCluster(object): - def __init__(self, rolename="placeholder"): - self.remotes = { - LocalRemote(): [rolename] - } - - def only(self, requested): - return self.__class__(rolename=requested) - - -class LocalContext(object): - def __init__(self): - self.config = {} - self.teuthology_config = teuth_config - self.cluster = LocalCluster() - self.daemons = DaemonGroup() - - # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any - # tests that want to look these up via ctx can do so. - # Inspect ceph.conf to see what roles exist - for conf_line in open("ceph.conf").readlines(): - for svc_type in ["mon", "osd", "mds", "mgr"]: - if svc_type not in self.daemons.daemons: - self.daemons.daemons[svc_type] = {} - match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) - if match: - svc_id = match.group(1) - self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id) - - def __del__(self): - shutil.rmtree(self.teuthology_config['test_path']) - - -def exec_test(): - # Parse arguments - interactive_on_error = False - create_cluster = False - - args = sys.argv[1:] - flags = [a for a in args if a.startswith("-")] - modules = [a for a in args if not a.startswith("-")] - for f in flags: - if f == "--interactive": - interactive_on_error = True - elif f == "--create": - create_cluster = True - else: - log.error("Unknown option '{0}'".format(f)) - sys.exit(-1) - - # Help developers by stopping up-front if their tree isn't built enough for all the - # tools that the tests might want to use (add more here if needed) - require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan", - "cephfs-table-tool", "ceph-fuse", "rados"] - missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))] - if missing_binaries: - log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries))) - sys.exit(-1) - - max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules) - - remote = LocalRemote() - - # Tolerate no MDSs or clients running at start - ps_txt = remote.run( - args=["ps", "-u"+str(os.getuid())] - ).stdout.getvalue().strip() - lines = ps_txt.split("\n")[1:] - for line in lines: - if 'ceph-fuse' in line or 'ceph-mds' in line: - pid = int(line.split()[0]) - log.warn("Killing stray process {0}".format(line)) - os.kill(pid, signal.SIGKILL) - - # Fire up the Ceph cluster if the user requested it - if create_cluster: - log.info("Creating cluster with {0} MDS daemons".format( - max_required_mds)) - remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False) - remote.run(["rm", "-rf", "./out"]) - remote.run(["rm", "-rf", "./dev"]) - vstart_env = os.environ.copy() - vstart_env["FS"] = "0" - vstart_env["MDS"] = max_required_mds.__str__() - vstart_env["OSD"] = "1" - vstart_env["MGR"] = max(max_required_mgr, 1).__str__() - - remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"], - env=vstart_env) - - # Wait for OSD to come up so that subsequent injectargs etc will - # definitely succeed - LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30) - - # List of client mounts, sufficient to run the selected tests - clients = [i.__str__() for i in range(0, max_required_clients)] - - test_dir = tempfile.mkdtemp() - teuth_config['test_path'] = test_dir - - # Construct Mount classes - mounts = [] - for client_id in clients: - # Populate client keyring (it sucks to use client.admin for test clients - # because it's awkward to find the logs later) - client_name = "client.{0}".format(client_id) - - if client_name not in open("./keyring").read(): - p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name, - "osd", "allow rw", - "mds", "allow", - "mon", "allow r"]) - - open("./keyring", "a").write(p.stdout.getvalue()) - - mount = LocalFuseMount(test_dir, client_id) - mounts.append(mount) - if mount.is_mounted(): - log.warn("unmounting {0}".format(mount.mountpoint)) - mount.umount_wait() - else: - if os.path.exists(mount.mountpoint): - os.rmdir(mount.mountpoint) - - ctx = LocalContext() - ceph_cluster = LocalCephCluster(ctx) - mds_cluster = LocalMDSCluster(ctx) - mgr_cluster = LocalMgrCluster(ctx) - - from tasks.cephfs_test_runner import DecoratingLoader - - class LogStream(object): - def __init__(self): - self.buffer = "" - - def write(self, data): - self.buffer += data - if "\n" in self.buffer: - lines = self.buffer.split("\n") - for line in lines[:-1]: - pass - # sys.stderr.write(line + "\n") - log.info(line) - self.buffer = lines[-1] - - def flush(self): - pass - - decorating_loader = DecoratingLoader({ - "ctx": ctx, - "mounts": mounts, - "ceph_cluster": ceph_cluster, - "mds_cluster": mds_cluster, - "mgr_cluster": mgr_cluster, - }) - - # For the benefit of polling tests like test_full -- in teuthology land we set this - # in a .yaml, here it's just a hardcoded thing for the developer's pleasure. - remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"]) - ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5") - - # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning - # from normal IO latency. Increase it for running teests. - ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10") - - # Make sure the filesystem created in tests has uid/gid that will let us talk to - # it after mounting it (without having to go root). Set in 'global' not just 'mds' - # so that cephfs-data-scan will pick it up too. - ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid()) - ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid()) - - # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on - def _get_package_version(remote, pkg_name): - # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right? - return "2.9" - - import teuthology.packaging - teuthology.packaging.get_package_version = _get_package_version - - overall_suite = load_tests(modules, decorating_loader) - - # Filter out tests that don't lend themselves to interactive running, - victims = [] - for case, method in enumerate_methods(overall_suite): - fn = getattr(method, method._testMethodName) - - drop_test = False - - if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True: - drop_test = True - log.warn("Dropping test because long running: ".format(method.id())) - - if getattr(fn, "needs_trimming", False) is True: - drop_test = (os.getuid() != 0) - log.warn("Dropping test because client trim unavailable: ".format(method.id())) - - if drop_test: - # Don't drop the test if it was explicitly requested in arguments - is_named = False - for named in modules: - if named.endswith(method.id()): - is_named = True - break - - if not is_named: - victims.append((case, method)) - - log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims))) - for s, method in victims: - s._tests.remove(method) - - if interactive_on_error: - result_class = InteractiveFailureResult - else: - result_class = unittest.TextTestResult - fail_on_skip = False - - class LoggingResult(result_class): - def startTest(self, test): - log.info("Starting test: {0}".format(self.getDescription(test))) - test.started_at = datetime.datetime.utcnow() - return super(LoggingResult, self).startTest(test) - - def stopTest(self, test): - log.info("Stopped test: {0} in {1}s".format( - self.getDescription(test), - (datetime.datetime.utcnow() - test.started_at).total_seconds() - )) - - def addSkip(self, test, reason): - if fail_on_skip: - # Don't just call addFailure because that requires a traceback - self.failures.append((test, reason)) - else: - super(LoggingResult, self).addSkip(test, reason) - - # Execute! - result = unittest.TextTestRunner( - stream=LogStream(), - resultclass=LoggingResult, - verbosity=2, - failfast=True).run(overall_suite) - - if not result.wasSuccessful(): - result.printErrors() # duplicate output at end for convenience - - bad_tests = [] - for test, error in result.errors: - bad_tests.append(str(test)) - for test, failure in result.failures: - bad_tests.append(str(test)) - - sys.exit(-1) - else: - sys.exit(0) - - -if __name__ == "__main__": - exec_test()