Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / cephfs / fuse_mount.py
1
2 from StringIO import StringIO
3 import json
4 import time
5 import logging
6 from textwrap import dedent
7
8 from teuthology import misc
9 from teuthology.contextutil import MaxWhileTries
10 from teuthology.orchestra import run
11 from teuthology.orchestra.run import CommandFailedError
12 from .mount import CephFSMount
13
14 log = logging.getLogger(__name__)
15
16
17 class FuseMount(CephFSMount):
18     def __init__(self, client_config, test_dir, client_id, client_remote):
19         super(FuseMount, self).__init__(test_dir, client_id, client_remote)
20
21         self.client_config = client_config if client_config else {}
22         self.fuse_daemon = None
23         self._fuse_conn = None
24
25     def mount(self, mount_path=None, mount_fs_name=None):
26         try:
27             return self._mount(mount_path, mount_fs_name)
28         except RuntimeError:
29             # Catch exceptions by the mount() logic (i.e. not remote command
30             # failures) and ensure the mount is not left half-up.
31             # Otherwise we might leave a zombie mount point that causes
32             # anyone traversing cephtest/ to get hung up on.
33             log.warn("Trying to clean up after failed mount")
34             self.umount_wait(force=True)
35             raise
36
37     def _mount(self, mount_path, mount_fs_name):
38         log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
39
40         daemon_signal = 'kill'
41         if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
42             daemon_signal = 'term'
43
44         log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45             id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
46
47         self.client_remote.run(
48             args=[
49                 'mkdir',
50                 '--',
51                 self.mountpoint,
52             ],
53         )
54
55         run_cmd = [
56             'sudo',
57             'adjust-ulimits',
58             'ceph-coverage',
59             '{tdir}/archive/coverage'.format(tdir=self.test_dir),
60             'daemon-helper',
61             daemon_signal,
62         ]
63
64         fuse_cmd = ['ceph-fuse', "-f"]
65
66         if mount_path is not None:
67             fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
68
69         if mount_fs_name is not None:
70             fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
71
72         fuse_cmd += [
73             '--name', 'client.{id}'.format(id=self.client_id),
74             # TODO ceph-fuse doesn't understand dash dash '--',
75             self.mountpoint,
76         ]
77
78         if self.client_config.get('valgrind') is not None:
79             run_cmd = misc.get_valgrind_args(
80                 self.test_dir,
81                 'client.{id}'.format(id=self.client_id),
82                 run_cmd,
83                 self.client_config.get('valgrind'),
84             )
85
86         run_cmd.extend(fuse_cmd)
87
88         def list_connections():
89             self.client_remote.run(
90                 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
91                 check_status=False
92             )
93             p = self.client_remote.run(
94                 args=["ls", "/sys/fs/fuse/connections"],
95                 stdout=StringIO(),
96                 check_status=False
97             )
98             if p.exitstatus != 0:
99                 return []
100
101             ls_str = p.stdout.getvalue().strip()
102             if ls_str:
103                 return [int(n) for n in ls_str.split("\n")]
104             else:
105                 return []
106
107         # Before starting ceph-fuse process, note the contents of
108         # /sys/fs/fuse/connections
109         pre_mount_conns = list_connections()
110         log.info("Pre-mount connections: {0}".format(pre_mount_conns))
111
112         proc = self.client_remote.run(
113             args=run_cmd,
114             logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
115             stdin=run.PIPE,
116             wait=False,
117         )
118         self.fuse_daemon = proc
119
120         # Wait for the connection reference to appear in /sys
121         mount_wait = self.client_config.get('mount_wait', 0)
122         if mount_wait > 0:
123             log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
124             time.sleep(mount_wait)            
125         timeout = int(self.client_config.get('mount_timeout', 30))
126         waited = 0
127
128         post_mount_conns = list_connections()
129         while len(post_mount_conns) <= len(pre_mount_conns):
130             if self.fuse_daemon.finished:
131                 # Did mount fail?  Raise the CommandFailedError instead of
132                 # hitting the "failed to populate /sys/" timeout
133                 self.fuse_daemon.wait()
134             time.sleep(1)
135             waited += 1
136             if waited > timeout:
137                 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
138                     waited
139                 ))
140             else:
141                 post_mount_conns = list_connections()
142
143         log.info("Post-mount connections: {0}".format(post_mount_conns))
144
145         # Record our fuse connection number so that we can use it when
146         # forcing an unmount
147         new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
148         if len(new_conns) == 0:
149             raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
150         elif len(new_conns) > 1:
151             raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
152         else:
153             self._fuse_conn = new_conns[0]
154
155     def is_mounted(self):
156         proc = self.client_remote.run(
157             args=[
158                 'stat',
159                 '--file-system',
160                 '--printf=%T\n',
161                 '--',
162                 self.mountpoint,
163             ],
164             stdout=StringIO(),
165             stderr=StringIO(),
166             wait=False
167         )
168         try:
169             proc.wait()
170         except CommandFailedError:
171             if ("endpoint is not connected" in proc.stderr.getvalue()
172             or "Software caused connection abort" in proc.stderr.getvalue()):
173                 # This happens is fuse is killed without unmount
174                 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
175                 return True
176             else:
177                 # This happens if the mount directory doesn't exist
178                 log.info('mount point does not exist: %s', self.mountpoint)
179                 return False
180
181         fstype = proc.stdout.getvalue().rstrip('\n')
182         if fstype == 'fuseblk':
183             log.info('ceph-fuse is mounted on %s', self.mountpoint)
184             return True
185         else:
186             log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
187                 fstype=fstype))
188             return False
189
190     def wait_until_mounted(self):
191         """
192         Check to make sure that fuse is mounted on mountpoint.  If not,
193         sleep for 5 seconds and check again.
194         """
195
196         while not self.is_mounted():
197             # Even if it's not mounted, it should at least
198             # be running: catch simple failures where it has terminated.
199             assert not self.fuse_daemon.poll()
200
201             time.sleep(5)
202
203         # Now that we're mounted, set permissions so that the rest of the test will have
204         # unrestricted access to the filesystem mount.
205         self.client_remote.run(
206             args=['sudo', 'chmod', '1777', self.mountpoint])
207
208     def _mountpoint_exists(self):
209         return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
210
211     def umount(self):
212         try:
213             log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
214             self.client_remote.run(
215                 args=[
216                     'sudo',
217                     'fusermount',
218                     '-u',
219                     self.mountpoint,
220                 ],
221             )
222         except run.CommandFailedError:
223             log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
224
225             self.client_remote.run(args=[
226                 'sudo',
227                 run.Raw('PATH=/usr/sbin:$PATH'),
228                 'lsof',
229                 run.Raw(';'),
230                 'ps',
231                 'auxf',
232             ])
233
234             # abort the fuse mount, killing all hung processes
235             if self._fuse_conn:
236                 self.run_python(dedent("""
237                 import os
238                 path = "/sys/fs/fuse/connections/{0}/abort"
239                 if os.path.exists(path):
240                     open(path, "w").write("1")
241                 """).format(self._fuse_conn))
242                 self._fuse_conn = None
243
244             stderr = StringIO()
245             try:
246                 # make sure its unmounted
247                 self.client_remote.run(
248                     args=[
249                         'sudo',
250                         'umount',
251                         '-l',
252                         '-f',
253                         self.mountpoint,
254                     ],
255                     stderr=stderr
256                 )
257             except CommandFailedError:
258                 if self.is_mounted():
259                     raise
260
261         assert not self.is_mounted()
262         self._fuse_conn = None
263
264     def umount_wait(self, force=False, require_clean=False):
265         """
266         :param force: Complete cleanly even if the MDS is offline
267         """
268         if force:
269             assert not require_clean  # mutually exclusive
270
271             # When we expect to be forcing, kill the ceph-fuse process directly.
272             # This should avoid hitting the more aggressive fallback killing
273             # in umount() which can affect other mounts too.
274             self.fuse_daemon.stdin.close()
275
276             # However, we will still hit the aggressive wait if there is an ongoing
277             # mount -o remount (especially if the remount is stuck because MDSs
278             # are unavailable)
279
280         self.umount()
281
282         try:
283             if self.fuse_daemon:
284                 # Permit a timeout, so that we do not block forever
285                 run.wait([self.fuse_daemon], 900)
286         except MaxWhileTries:
287             log.error("process failed to terminate after unmount.  This probably"
288                       "indicates a bug within ceph-fuse.")
289             raise
290         except CommandFailedError:
291             if require_clean:
292                 raise
293
294         self.cleanup()
295
296     def cleanup(self):
297         """
298         Remove the mount point.
299
300         Prerequisite: the client is not mounted.
301         """
302         stderr = StringIO()
303         try:
304             self.client_remote.run(
305                 args=[
306                     'rmdir',
307                     '--',
308                     self.mountpoint,
309                 ],
310                 stderr=stderr
311             )
312         except CommandFailedError:
313             if "No such file or directory" in stderr.getvalue():
314                 pass
315             else:
316                 raise
317
318     def kill(self):
319         """
320         Terminate the client without removing the mount point.
321         """
322         self.fuse_daemon.stdin.close()
323         try:
324             self.fuse_daemon.wait()
325         except CommandFailedError:
326             pass
327
328     def kill_cleanup(self):
329         """
330         Follow up ``kill`` to get to a clean unmounted state.
331         """
332         self.umount()
333         self.cleanup()
334
335     def teardown(self):
336         """
337         Whatever the state of the mount, get it gone.
338         """
339         super(FuseMount, self).teardown()
340
341         self.umount()
342
343         if self.fuse_daemon and not self.fuse_daemon.finished:
344             self.fuse_daemon.stdin.close()
345             try:
346                 self.fuse_daemon.wait()
347             except CommandFailedError:
348                 pass
349
350         # Indiscriminate, unlike the touchier cleanup()
351         self.client_remote.run(
352             args=[
353                 'rm',
354                 '-rf',
355                 self.mountpoint,
356             ],
357         )
358
359     def _asok_path(self):
360         return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
361
362     @property
363     def _prefix(self):
364         return ""
365
366     def admin_socket(self, args):
367         pyscript = """
368 import glob
369 import re
370 import os
371 import subprocess
372
373 def find_socket(client_name):
374         asok_path = "{asok_path}"
375         files = glob.glob(asok_path)
376
377         # Given a non-glob path, it better be there
378         if "*" not in asok_path:
379             assert(len(files) == 1)
380             return files[0]
381
382         for f in files:
383                 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
384                 if os.path.exists("/proc/{{0}}".format(pid)):
385                         return f
386         raise RuntimeError("Client socket {{0}} not found".format(client_name))
387
388 print find_socket("{client_name}")
389 """.format(
390             asok_path=self._asok_path(),
391             client_name="client.{0}".format(self.client_id))
392
393         # Find the admin socket
394         p = self.client_remote.run(args=[
395             'python', '-c', pyscript
396         ], stdout=StringIO())
397         asok_path = p.stdout.getvalue().strip()
398         log.info("Found client admin socket at {0}".format(asok_path))
399
400         # Query client ID from admin socket
401         p = self.client_remote.run(
402             args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
403             stdout=StringIO())
404         return json.loads(p.stdout.getvalue())
405
406     def get_global_id(self):
407         """
408         Look up the CephFS client ID for this mount
409         """
410
411         return self.admin_socket(['mds_sessions'])['id']
412
413     def get_osd_epoch(self):
414         """
415         Return 2-tuple of osd_epoch, osd_epoch_barrier
416         """
417         status = self.admin_socket(['status'])
418         return status['osd_epoch'], status['osd_epoch_barrier']
419
420     def get_dentry_count(self):
421         """
422         Return 2-tuple of dentry_count, dentry_pinned_count
423         """
424         status = self.admin_socket(['status'])
425         return status['dentry_count'], status['dentry_pinned_count']
426
427     def set_cache_size(self, size):
428         return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])