1 from contextlib import contextmanager
6 from textwrap import dedent
8 from StringIO import StringIO
9 from teuthology.orchestra import run
10 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
12 log = logging.getLogger(__name__)
15 class CephFSMount(object):
16 def __init__(self, test_dir, client_id, client_remote):
18 :param test_dir: Global teuthology test dir
19 :param client_id: Client ID, the 'foo' in client.foo
20 :param client_remote: Remote instance for the host where client will run
23 self.test_dir = test_dir
24 self.client_id = client_id
25 self.client_remote = client_remote
26 self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
28 self.test_files = ['a', 'b', 'c']
30 self.background_procs = []
35 self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
38 raise NotImplementedError()
40 def mount(self, mount_path=None, mount_fs_name=None):
41 raise NotImplementedError()
44 raise NotImplementedError()
46 def umount_wait(self, force=False, require_clean=False):
49 :param force: Expect that the mount will not shutdown cleanly: kill
51 :param require_clean: Wait for the Ceph client associated with the
52 mount (e.g. ceph-fuse) to terminate, and
53 raise if it doesn't do so cleanly.
56 raise NotImplementedError()
58 def kill_cleanup(self):
59 raise NotImplementedError()
62 raise NotImplementedError()
65 raise NotImplementedError()
67 def wait_until_mounted(self):
68 raise NotImplementedError()
70 def get_keyring_path(self):
71 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
74 def config_path(self):
76 Path to ceph.conf: override this if you're not a normal systemwide ceph install
79 return "/etc/ceph/ceph.conf"
84 A context manager, from an initially unmounted state, to mount
85 this, yield, and then unmount and clean up.
88 self.wait_until_mounted()
94 def create_files(self):
95 assert(self.is_mounted())
97 for suffix in self.test_files:
98 log.info("Creating file {0}".format(suffix))
99 self.client_remote.run(args=[
100 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
103 def check_files(self):
104 assert(self.is_mounted())
106 for suffix in self.test_files:
107 log.info("Checking file {0}".format(suffix))
108 r = self.client_remote.run(args=[
109 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
110 ], check_status=False)
111 if r.exitstatus != 0:
112 raise RuntimeError("Expected file {0} not found".format(suffix))
114 def create_destroy(self):
115 assert(self.is_mounted())
117 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
118 log.debug("Creating test file {0}".format(filename))
119 self.client_remote.run(args=[
120 'sudo', 'touch', os.path.join(self.mountpoint, filename)
122 log.debug("Deleting test file {0}".format(filename))
123 self.client_remote.run(args=[
124 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
127 def _run_python(self, pyscript):
128 return self.client_remote.run(args=[
129 'sudo', 'adjust-ulimits', 'daemon-helper', 'kill', 'python', '-c', pyscript
130 ], wait=False, stdin=run.PIPE, stdout=StringIO())
132 def run_python(self, pyscript):
133 p = self._run_python(pyscript)
135 return p.stdout.getvalue().strip()
137 def run_shell(self, args, wait=True):
138 args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
139 return self.client_remote.run(args=args, stdout=StringIO(),
140 stderr=StringIO(), wait=wait)
142 def open_no_data(self, basename):
144 A pure metadata operation
146 assert(self.is_mounted())
148 path = os.path.join(self.mountpoint, basename)
150 p = self._run_python(dedent(
152 f = open("{path}", 'w')
153 """.format(path=path)
157 def open_background(self, basename="background_file"):
159 Open a file for writing, then block such that the client
160 will hold a capability.
162 Don't return until the remote process has got as far as opening
163 the file, then return the RemoteProcess instance.
165 assert(self.is_mounted())
167 path = os.path.join(self.mountpoint, basename)
169 pyscript = dedent("""
172 f = open("{path}", 'w')
178 """).format(path=path)
180 rproc = self._run_python(pyscript)
181 self.background_procs.append(rproc)
183 # This wait would not be sufficient if the file had already
184 # existed, but it's simple and in practice users of open_background
185 # are not using it on existing files.
186 self.wait_for_visible(basename)
190 def wait_for_visible(self, basename="background_file", timeout=30):
193 r = self.client_remote.run(args=[
194 'sudo', 'ls', os.path.join(self.mountpoint, basename)
195 ], check_status=False)
196 if r.exitstatus == 0:
197 log.debug("File {0} became visible from {1} after {2}s".format(
198 basename, self.client_id, i))
204 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
205 i, basename, self.client_id))
207 def lock_background(self, basename="background_file", do_flock=True):
209 Open and lock a files for writing, hold the lock in a background process
211 assert(self.is_mounted())
213 path = os.path.join(self.mountpoint, basename)
220 script_builder += """
221 f1 = open("{path}-1", 'w')
222 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
223 script_builder += """
224 f2 = open("{path}-2", 'w')
225 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
226 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
231 pyscript = dedent(script_builder).format(path=path)
233 log.info("lock_background file {0}".format(basename))
234 rproc = self._run_python(pyscript)
235 self.background_procs.append(rproc)
238 def lock_and_release(self, basename="background_file"):
239 assert(self.is_mounted())
241 path = os.path.join(self.mountpoint, basename)
247 f1 = open("{path}-1", 'w')
248 fcntl.flock(f1, fcntl.LOCK_EX)
249 f2 = open("{path}-2", 'w')
250 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
251 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
253 pyscript = dedent(script).format(path=path)
255 log.info("lock_and_release file {0}".format(basename))
256 return self._run_python(pyscript)
258 def check_filelock(self, basename="background_file", do_flock=True):
259 assert(self.is_mounted())
261 path = os.path.join(self.mountpoint, basename)
268 script_builder += """
269 f1 = open("{path}-1", 'r')
271 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
273 if e.errno == errno.EAGAIN:
276 raise RuntimeError("flock on file {path}-1 not found")"""
277 script_builder += """
278 f2 = open("{path}-2", 'r')
280 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
281 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
283 if e.errno == errno.EAGAIN:
286 raise RuntimeError("posix lock on file {path}-2 not found")
288 pyscript = dedent(script_builder).format(path=path)
290 log.info("check lock on file {0}".format(basename))
291 self.client_remote.run(args=[
292 'sudo', 'python', '-c', pyscript
295 def write_background(self, basename="background_file", loop=False):
297 Open a file for writing, complete as soon as you can
301 assert(self.is_mounted())
303 path = os.path.join(self.mountpoint, basename)
305 pyscript = dedent("""
309 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0644)
312 os.write(fd, 'content')
319 """).format(path=path, loop=str(loop))
321 rproc = self._run_python(pyscript)
322 self.background_procs.append(rproc)
325 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
327 Write the requested number of megabytes to a file
329 assert(self.is_mounted())
331 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
332 "bs=1M", "conv=fdatasync",
333 "count={0}".format(n_mb),
334 "seek={0}".format(seek)
337 def write_test_pattern(self, filename, size):
338 log.info("Writing {0} bytes to {1}".format(size, filename))
339 return self.run_python(dedent("""
343 for i in range(0, {size}):
344 val = zlib.crc32("%s" % i) & 7
348 path=os.path.join(self.mountpoint, filename),
352 def validate_test_pattern(self, filename, size):
353 log.info("Validating {0} bytes from {1}".format(size, filename))
354 return self.run_python(dedent("""
360 if len(bytes) != {size}:
361 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
364 for i, b in enumerate(bytes):
365 val = zlib.crc32("%s" % i) & 7
367 raise RuntimeError("Bad data at offset {{0}}".format(i))
369 path=os.path.join(self.mountpoint, filename),
373 def open_n_background(self, fs_path, count):
375 Open N files for writing, hold them open in a background process
377 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
378 :return: a RemoteProcess
380 assert(self.is_mounted())
382 abs_path = os.path.join(self.mountpoint, fs_path)
384 pyscript = dedent("""
390 abs_path = "{abs_path}"
392 if not os.path.exists(os.path.dirname(abs_path)):
393 os.makedirs(os.path.dirname(abs_path))
396 for i in range(0, n):
397 fname = "{{0}}_{{1}}".format(abs_path, i)
398 handles.append(open(fname, 'w'))
402 """).format(abs_path=abs_path, count=count)
404 rproc = self._run_python(pyscript)
405 self.background_procs.append(rproc)
408 def create_n_files(self, fs_path, count, sync=False):
409 assert(self.is_mounted())
411 abs_path = os.path.join(self.mountpoint, fs_path)
413 pyscript = dedent("""
419 abs_path = "{abs_path}"
421 if not os.path.exists(os.path.dirname(abs_path)):
422 os.makedirs(os.path.dirname(abs_path))
424 for i in range(0, n):
425 fname = "{{0}}_{{1}}".format(abs_path, i)
432 """).format(abs_path=abs_path, count=count, sync=str(sync))
434 self.run_python(pyscript)
437 for p in self.background_procs:
438 log.info("Terminating background process")
439 self._kill_background(p)
441 self.background_procs = []
443 def _kill_background(self, p):
448 except (CommandFailedError, ConnectionLostError):
451 def kill_background(self, p):
453 For a process that was returned by one of the _background member functions,
456 self._kill_background(p)
457 self.background_procs.remove(p)
459 def get_global_id(self):
460 raise NotImplementedError()
462 def get_osd_epoch(self):
463 raise NotImplementedError()
465 def stat(self, fs_path, wait=True):
467 stat a file, and return the result as a dictionary like this:
469 "st_ctime": 1414161137.0,
470 "st_mtime": 1414161137.0,
478 "st_atime": 1431520593.0
481 Raises exception on absent file.
483 abs_path = os.path.join(self.mountpoint, fs_path)
485 pyscript = dedent("""
492 s = os.stat("{path}")
496 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
498 dict([(a, getattr(s, a)) for a in attrs]),
500 """).format(path=abs_path)
501 proc = self._run_python(pyscript)
504 return json.loads(proc.stdout.getvalue().strip())
508 def touch(self, fs_path):
510 Create a dentry if it doesn't already exist. This python
511 implementation exists because the usual command line tool doesn't
512 pass through error codes like EIO.
517 abs_path = os.path.join(self.mountpoint, fs_path)
518 pyscript = dedent("""
523 f = open("{path}", "w")
527 """).format(path=abs_path)
528 proc = self._run_python(pyscript)
531 def path_to_ino(self, fs_path, follow_symlinks=True):
532 abs_path = os.path.join(self.mountpoint, fs_path)
535 pyscript = dedent("""
539 print os.stat("{path}").st_ino
540 """).format(path=abs_path)
542 pyscript = dedent("""
546 print os.lstat("{path}").st_ino
547 """).format(path=abs_path)
549 proc = self._run_python(pyscript)
551 return int(proc.stdout.getvalue().strip())
553 def path_to_nlink(self, fs_path):
554 abs_path = os.path.join(self.mountpoint, fs_path)
556 pyscript = dedent("""
560 print os.stat("{path}").st_nlink
561 """).format(path=abs_path)
563 proc = self._run_python(pyscript)
565 return int(proc.stdout.getvalue().strip())
567 def ls(self, path=None):
569 Wrap ls: return a list of strings
575 ls_text = self.run_shell(cmd).stdout.getvalue().strip()
578 return ls_text.split("\n")
580 # Special case because otherwise split on empty string
581 # gives you [''] instead of []
584 def setfattr(self, path, key, val):
588 :param path: relative to mount point
589 :param key: xattr name
590 :param val: xattr value
593 self.run_shell(["setfattr", "-n", key, "-v", val, path])
595 def getfattr(self, path, attr):
597 Wrap getfattr: return the values of a named xattr on one file, or
598 None if the attribute is not found.
602 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
605 except CommandFailedError as e:
606 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
611 return p.stdout.getvalue()
615 Wrap df: return a dict of usage fields in bytes
618 p = self.run_shell(["df", "-B1", "."])
619 lines = p.stdout.getvalue().strip().split("\n")
620 fs, total, used, avail = lines[1].split()[:4]
626 "available": int(avail)