Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL2.  See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25
26 class RadosError(Exception):
27     """
28     Something went wrong talking to Ceph with librados
29     """
30     pass
31
32
33 RADOS_TIMEOUT = 10
34
35 log = logging.getLogger(__name__)
36
37
38 # Reserved volume group name which we use in paths for volumes
39 # that are not assigned to a group (i.e. created with group=None)
40 NO_GROUP_NAME = "_nogroup"
41
42 # Filename extensions for meta files.
43 META_FILE_EXT = ".meta"
44
45 class VolumePath(object):
46     """
47     Identify a volume's path as group->volume
48     The Volume ID is a unique identifier, but this is a much more
49     helpful thing to pass around.
50     """
51     def __init__(self, group_id, volume_id):
52         self.group_id = group_id
53         self.volume_id = volume_id
54         assert self.group_id != NO_GROUP_NAME
55         assert self.volume_id != "" and self.volume_id is not None
56
57     def __str__(self):
58         return "{0}/{1}".format(self.group_id, self.volume_id)
59
60
61 class ClusterTimeout(Exception):
62     """
63     Exception indicating that we timed out trying to talk to the Ceph cluster,
64     either to the mons, or to any individual daemon that the mons indicate ought
65     to be up but isn't responding to us.
66     """
67     pass
68
69
70 class ClusterError(Exception):
71     """
72     Exception indicating that the cluster returned an error to a command that
73     we thought should be successful based on our last knowledge of the cluster
74     state.
75     """
76     def __init__(self, action, result_code, result_str):
77         self._action = action
78         self._result_code = result_code
79         self._result_str = result_str
80
81     def __str__(self):
82         return "Error {0} (\"{1}\") while {2}".format(
83             self._result_code, self._result_str, self._action)
84
85
86 class RankEvicter(threading.Thread):
87     """
88     Thread for evicting client(s) from a particular MDS daemon instance.
89
90     This is more complex than simply sending a command, because we have to
91     handle cases where MDS daemons might not be fully up yet, and/or might
92     be transiently unresponsive to commands.
93     """
94     class GidGone(Exception):
95         pass
96
97     POLL_PERIOD = 5
98
99     def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
100         """
101         :param client_spec: list of strings, used as filter arguments to "session evict"
102                             pass ["id=123"] to evict a single client with session id 123.
103         """
104         self.rank = rank
105         self.gid = gid
106         self._mds_map = mds_map
107         self._client_spec = client_spec
108         self._volume_client = volume_client
109         self._ready_timeout = ready_timeout
110         self._ready_waited = 0
111
112         self.success = False
113         self.exception = None
114
115         super(RankEvicter, self).__init__()
116
117     def _ready_to_evict(self):
118         if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
119             log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
120                 self._client_spec, self.rank, self.gid
121             ))
122             raise RankEvicter.GidGone()
123
124         info = self._mds_map['info']["gid_{0}".format(self.gid)]
125         log.debug("_ready_to_evict: state={0}".format(info['state']))
126         return info['state'] in ["up:active", "up:clientreplay"]
127
128     def _wait_for_ready(self):
129         """
130         Wait for that MDS rank to reach an active or clientreplay state, and
131         not be laggy.
132         """
133         while not self._ready_to_evict():
134             if self._ready_waited > self._ready_timeout:
135                 raise ClusterTimeout()
136
137             time.sleep(self.POLL_PERIOD)
138             self._ready_waited += self.POLL_PERIOD
139
140             self._mds_map = self._volume_client._rados_command("mds dump", {})
141
142     def _evict(self):
143         """
144         Run the eviction procedure.  Return true on success, false on errors.
145         """
146
147         # Wait til the MDS is believed by the mon to be available for commands
148         try:
149             self._wait_for_ready()
150         except self.GidGone:
151             return True
152
153         # Then send it an evict
154         ret = errno.ETIMEDOUT
155         while ret == errno.ETIMEDOUT:
156             log.debug("mds_command: {0}, {1}".format(
157                 "%s" % self.gid, ["session", "evict"] + self._client_spec
158             ))
159             ret, outb, outs = self._volume_client.fs.mds_command(
160                 "%s" % self.gid,
161                 [json.dumps({
162                                 "prefix": "session evict",
163                                 "filters": self._client_spec
164                 })], "")
165             log.debug("mds_command: complete {0} {1}".format(ret, outs))
166
167             # If we get a clean response, great, it's gone from that rank.
168             if ret == 0:
169                 return True
170             elif ret == errno.ETIMEDOUT:
171                 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
172                 self._mds_map = self._volume_client._rados_command("mds dump", {})
173                 try:
174                     self._wait_for_ready()
175                 except self.GidGone:
176                     return True
177             else:
178                 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
179
180     def run(self):
181         try:
182             self._evict()
183         except Exception as e:
184             self.success = False
185             self.exception = e
186         else:
187             self.success = True
188
189
190 class EvictionError(Exception):
191     pass
192
193
194 class CephFSVolumeClientError(Exception):
195     """
196     Something went wrong talking to Ceph using CephFSVolumeClient.
197     """
198     pass
199
200
201 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
202
203     CephFSVolumeClient Version History:
204
205     * 1 - Initial version
206     * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
207
208 """
209
210
211 class CephFSVolumeClient(object):
212     """
213     Combine libcephfs and librados interfaces to implement a
214     'Volume' concept implemented as a cephfs directory and
215     client capabilities which restrict mount access to this
216     directory.
217
218     Additionally, volumes may be in a 'Group'.  Conveniently,
219     volumes are a lot like manila shares, and groups are a lot
220     like manila consistency groups.
221
222     Refer to volumes with VolumePath, which specifies the
223     volume and group IDs (both strings).  The group ID may
224     be None.
225
226     In general, functions in this class are allowed raise rados.Error
227     or cephfs.Error exceptions in unexpected situations.
228     """
229
230     # Current version
231     version = 2
232
233     # Where shall we create our volumes?
234     POOL_PREFIX = "fsvolume_"
235     DEFAULT_VOL_PREFIX = "/volumes"
236     DEFAULT_NS_PREFIX = "fsvolumens_"
237
238     def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
239         self.fs = None
240         self.rados = None
241         self.connected = False
242         self.conf_path = conf_path
243         self.cluster_name = cluster_name
244         self.auth_id = auth_id
245         self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
246         self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
247         # For flock'ing in cephfs, I want a unique ID to distinguish me
248         # from any other manila-share services that are loading this module.
249         # We could use pid, but that's unnecessary weak: generate a
250         # UUID
251         self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
252
253         # TODO: version the on-disk structures
254
255     def recover(self):
256         # Scan all auth keys to see if they're dirty: if they are, they have
257         # state that might not have propagated to Ceph or to the related
258         # volumes yet.
259
260         # Important: we *always* acquire locks in the order auth->volume
261         # That means a volume can never be dirty without the auth key
262         # we're updating it with being dirty at the same time.
263
264         # First list the auth IDs that have potentially dirty on-disk metadata
265         log.debug("Recovering from partial auth updates (if any)...")
266
267         try:
268             dir_handle = self.fs.opendir(self.volume_prefix)
269         except cephfs.ObjectNotFound:
270             log.debug("Nothing to recover. No auth meta files.")
271             return
272
273         d = self.fs.readdir(dir_handle)
274         auth_ids = []
275
276         if not d:
277             log.debug("Nothing to recover. No auth meta files.")
278
279         while d:
280             # Identify auth IDs from auth meta filenames. The auth meta files
281             # are named as, "$<auth_id><meta filename extension>"
282             regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
283             match = re.search(regex, d.d_name)
284             if match:
285                 auth_ids.append(match.group(1))
286
287             d = self.fs.readdir(dir_handle)
288
289         self.fs.closedir(dir_handle)
290
291         # Key points based on ordering:
292         # * Anything added in VMeta is already added in AMeta
293         # * Anything added in Ceph is already added in VMeta
294         # * Anything removed in VMeta is already removed in Ceph
295         # * Anything removed in AMeta is already removed in VMeta
296
297         # Deauthorization: because I only update metadata AFTER the
298         # update of the next level down, I have the same ordering of
299         # -> things which exist in the AMeta should also exist
300         #    in the VMeta, should also exist in Ceph, and the same
301         #    recovery procedure that gets me consistent after crashes
302         #    during authorization will also work during deauthorization
303
304         # Now for each auth ID, check for dirty flag and apply updates
305         # if dirty flag is found
306         for auth_id in auth_ids:
307             with self._auth_lock(auth_id):
308                 auth_meta = self._auth_metadata_get(auth_id)
309                 if not auth_meta or not auth_meta['volumes']:
310                     # Clean up auth meta file
311                     self.fs.unlink(self._auth_metadata_path(auth_id))
312                     continue
313                 if not auth_meta['dirty']:
314                     continue
315                 self._recover_auth_meta(auth_id, auth_meta)
316
317         log.debug("Recovered from partial auth updates (if any).")
318
319     def _recover_auth_meta(self, auth_id, auth_meta):
320         """
321         Call me after locking the auth meta file.
322         """
323         remove_volumes = []
324
325         for volume, volume_data in auth_meta['volumes'].items():
326             if not volume_data['dirty']:
327                 continue
328
329             (group_id, volume_id) = volume.split('/')
330             group_id = group_id if group_id is not 'None' else None
331             volume_path = VolumePath(group_id, volume_id)
332             access_level = volume_data['access_level']
333
334             with self._volume_lock(volume_path):
335                 vol_meta = self._volume_metadata_get(volume_path)
336
337                 # No VMeta update indicates that there was no auth update
338                 # in Ceph either. So it's safe to remove corresponding
339                 # partial update in AMeta.
340                 if not vol_meta or auth_id not in vol_meta['auths']:
341                     remove_volumes.append(volume)
342                     continue
343
344                 want_auth = {
345                     'access_level': access_level,
346                     'dirty': False,
347                 }
348                 # VMeta update looks clean. Ceph auth update must have been
349                 # clean.
350                 if vol_meta['auths'][auth_id] == want_auth:
351                     continue
352
353                 readonly = True if access_level is 'r' else False
354                 self._authorize_volume(volume_path, auth_id, readonly)
355
356             # Recovered from partial auth updates for the auth ID's access
357             # to a volume.
358             auth_meta['volumes'][volume]['dirty'] = False
359             self._auth_metadata_set(auth_id, auth_meta)
360
361         for volume in remove_volumes:
362             del auth_meta['volumes'][volume]
363
364         if not auth_meta['volumes']:
365             # Clean up auth meta file
366             self.fs.unlink(self._auth_metadata_path(auth_id))
367             return
368
369         # Recovered from all partial auth updates for the auth ID.
370         auth_meta['dirty'] = False
371         self._auth_metadata_set(auth_id, auth_meta)
372
373
374     def evict(self, auth_id, timeout=30, volume_path=None):
375         """
376         Evict all clients based on the authorization ID and optionally based on
377         the volume path mounted.  Assumes that the authorization key has been
378         revoked prior to calling this function.
379
380         This operation can throw an exception if the mon cluster is unresponsive, or
381         any individual MDS daemon is unresponsive for longer than the timeout passed in.
382         """
383
384         client_spec = ["auth_name={0}".format(auth_id), ]
385         if volume_path:
386             client_spec.append("client_metadata.root={0}".
387                                format(self._get_path(volume_path)))
388
389         log.info("evict clients with {0}".format(', '.join(client_spec)))
390
391         mds_map = self._rados_command("mds dump", {})
392
393         up = {}
394         for name, gid in mds_map['up'].items():
395             # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
396             assert name.startswith("mds_")
397             up[int(name[4:])] = gid
398
399         # For all MDS ranks held by a daemon
400         # Do the parallelism in python instead of using "tell mds.*", because
401         # the latter doesn't give us per-mds output
402         threads = []
403         for rank, gid in up.items():
404             thread = RankEvicter(self, client_spec, rank, gid, mds_map,
405                                  timeout)
406             thread.start()
407             threads.append(thread)
408
409         for t in threads:
410             t.join()
411
412         log.info("evict: joined all")
413
414         for t in threads:
415             if not t.success:
416                 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
417                        format(', '.join(client_spec), t.rank, t.gid, t.exception)
418                       )
419                 log.error(msg)
420                 raise EvictionError(msg)
421
422     def _get_path(self, volume_path):
423         """
424         Determine the path within CephFS where this volume will live
425         :return: absolute path (string)
426         """
427         return os.path.join(
428             self.volume_prefix,
429             volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
430             volume_path.volume_id)
431
432     def _get_group_path(self, group_id):
433         if group_id is None:
434             raise ValueError("group_id may not be None")
435
436         return os.path.join(
437             self.volume_prefix,
438             group_id
439         )
440
441     def connect(self, premount_evict = None):
442         """
443
444         :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
445                                may want to use this to specify their own auth ID if they expect
446                                to be a unique instance and don't want to wait for caps to time
447                                out after failure of another instance of themselves.
448         """
449         log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
450         self.rados = rados.Rados(
451             name="client.{0}".format(self.auth_id),
452             clustername=self.cluster_name,
453             conffile=self.conf_path,
454             conf={}
455         )
456         self.rados.connect()
457
458         log.debug("Connection to RADOS complete")
459
460         log.debug("Connecting to cephfs...")
461         self.fs = cephfs.LibCephFS(rados_inst=self.rados)
462         log.debug("CephFS initializing...")
463         self.fs.init()
464         if premount_evict is not None:
465             log.debug("Premount eviction of {0} starting".format(premount_evict))
466             self.evict(premount_evict)
467             log.debug("Premount eviction of {0} completes".format(premount_evict))
468         log.debug("CephFS mounting...")
469         self.fs.mount()
470         log.debug("Connection to cephfs complete")
471
472         # Recover from partial auth updates due to a previous
473         # crash.
474         self.recover()
475
476     def get_mon_addrs(self):
477         log.info("get_mon_addrs")
478         result = []
479         mon_map = self._rados_command("mon dump")
480         for mon in mon_map['mons']:
481             ip_port = mon['addr'].split("/")[0]
482             result.append(ip_port)
483
484         return result
485
486     def disconnect(self):
487         log.info("disconnect")
488         if self.fs:
489             log.debug("Disconnecting cephfs...")
490             self.fs.shutdown()
491             self.fs = None
492             log.debug("Disconnecting cephfs complete")
493
494         if self.rados:
495             log.debug("Disconnecting rados...")
496             self.rados.shutdown()
497             self.rados = None
498             log.debug("Disconnecting rados complete")
499
500     def __del__(self):
501         self.disconnect()
502
503     def _get_pool_id(self, osd_map, pool_name):
504         # Maybe borrow the OSDMap wrapper class from calamari if more helpers
505         # like this are needed.
506         for pool in osd_map['pools']:
507             if pool['pool_name'] == pool_name:
508                 return pool['pool']
509
510         return None
511
512     def _create_volume_pool(self, pool_name):
513         """
514         Idempotently create a pool for use as a CephFS data pool, with the given name
515
516         :return The ID of the created pool
517         """
518         osd_map = self._rados_command('osd dump', {})
519
520         existing_id = self._get_pool_id(osd_map, pool_name)
521         if existing_id is not None:
522             log.info("Pool {0} already exists".format(pool_name))
523             return existing_id
524
525         osd_count = len(osd_map['osds'])
526
527         # We can't query the actual cluster config remotely, but since this is
528         # just a heuristic we'll assume that the ceph.conf we have locally reflects
529         # that in use in the rest of the cluster.
530         pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
531
532         other_pgs = 0
533         for pool in osd_map['pools']:
534             if not pool['pool_name'].startswith(self.POOL_PREFIX):
535                 other_pgs += pool['pg_num']
536
537         # A basic heuristic for picking pg_num: work out the max number of
538         # PGs we can have without tripping a warning, then subtract the number
539         # of PGs already created by non-manila pools, then divide by ten.  That'll
540         # give you a reasonable result on a system where you have "a few" manila
541         # shares.
542         pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10
543         # TODO Alternatively, respect an override set by the user.
544
545         self._rados_command(
546             'osd pool create',
547             {
548                 'pool': pool_name,
549                 'pg_num': pg_num
550             }
551         )
552
553         osd_map = self._rados_command('osd dump', {})
554         pool_id = self._get_pool_id(osd_map, pool_name)
555
556         if pool_id is None:
557             # If the pool isn't there, that's either a ceph bug, or it's some outside influence
558             # removing it right after we created it.
559             log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
560                 pool_name, json.dumps(osd_map, indent=2)
561             ))
562             raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
563         else:
564             return pool_id
565
566     def create_group(self, group_id):
567         # Prevent craftily-named volume groups from colliding with the meta
568         # files.
569         if group_id.endswith(META_FILE_EXT):
570             raise ValueError("group ID cannot end with '{0}'.".format(
571                 META_FILE_EXT))
572         path = self._get_group_path(group_id)
573         self._mkdir_p(path)
574
575     def destroy_group(self, group_id):
576         path = self._get_group_path(group_id)
577         try:
578             self.fs.stat(self.volume_prefix)
579         except cephfs.ObjectNotFound:
580             pass
581         else:
582             self.fs.rmdir(path)
583
584     def _mkdir_p(self, path):
585         try:
586             self.fs.stat(path)
587         except cephfs.ObjectNotFound:
588             pass
589         else:
590             return
591
592         parts = path.split(os.path.sep)
593
594         for i in range(1, len(parts) + 1):
595             subpath = os.path.join(*parts[0:i])
596             try:
597                 self.fs.stat(subpath)
598             except cephfs.ObjectNotFound:
599                 self.fs.mkdir(subpath, 0o755)
600
601     def create_volume(self, volume_path, size=None, data_isolated=False):
602         """
603         Set up metadata, pools and auth for a volume.
604
605         This function is idempotent.  It is safe to call this again
606         for an already-created volume, even if it is in use.
607
608         :param volume_path: VolumePath instance
609         :param size: In bytes, or None for no size limit
610         :param data_isolated: If true, create a separate OSD pool for this volume
611         :return:
612         """
613         path = self._get_path(volume_path)
614         log.info("create_volume: {0}".format(path))
615
616         self._mkdir_p(path)
617
618         if size is not None:
619             self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
620
621         # data_isolated means create a separate pool for this volume
622         if data_isolated:
623             pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
624             log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
625             pool_id = self._create_volume_pool(pool_name)
626             mds_map = self._rados_command("mds dump", {})
627             if pool_id not in mds_map['data_pools']:
628                 self._rados_command("mds add_data_pool", {
629                     'pool': pool_name
630                 })
631             self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
632
633         # enforce security isolation, use seperate namespace for this volume
634         namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
635         log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
636         self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0)
637
638         # Create a volume meta file, if it does not already exist, to store
639         # data about auth ids having access to the volume
640         fd = self.fs.open(self._volume_metadata_path(volume_path),
641                           os.O_CREAT, 0o755)
642         self.fs.close(fd)
643
644         return {
645             'mount_path': path
646         }
647
648     def delete_volume(self, volume_path, data_isolated=False):
649         """
650         Make a volume inaccessible to guests.  This function is
651         idempotent.  This is the fast part of tearing down a volume: you must
652         also later call purge_volume, which is the slow part.
653
654         :param volume_path: Same identifier used in create_volume
655         :return:
656         """
657
658         path = self._get_path(volume_path)
659         log.info("delete_volume: {0}".format(path))
660
661         # Create the trash folder if it doesn't already exist
662         trash = os.path.join(self.volume_prefix, "_deleting")
663         self._mkdir_p(trash)
664
665         # We'll move it to here
666         trashed_volume = os.path.join(trash, volume_path.volume_id)
667
668         # Move the volume's data to the trash folder
669         try:
670             self.fs.stat(path)
671         except cephfs.ObjectNotFound:
672             log.warning("Trying to delete volume '{0}' but it's already gone".format(
673                 path))
674         else:
675             self.fs.rename(path, trashed_volume)
676
677         # Delete the volume meta file, if it's not already deleted
678         vol_meta_path = self._volume_metadata_path(volume_path)
679         try:
680             self.fs.unlink(vol_meta_path)
681         except cephfs.ObjectNotFound:
682             pass
683
684     def purge_volume(self, volume_path, data_isolated=False):
685         """
686         Finish clearing up a volume that was previously passed to delete_volume.  This
687         function is idempotent.
688         """
689
690         trash = os.path.join(self.volume_prefix, "_deleting")
691         trashed_volume = os.path.join(trash, volume_path.volume_id)
692
693         try:
694             self.fs.stat(trashed_volume)
695         except cephfs.ObjectNotFound:
696             log.warning("Trying to purge volume '{0}' but it's already been purged".format(
697                 trashed_volume))
698             return
699
700         def rmtree(root_path):
701             log.debug("rmtree {0}".format(root_path))
702             dir_handle = self.fs.opendir(root_path)
703             d = self.fs.readdir(dir_handle)
704             while d:
705                 if d.d_name not in [".", ".."]:
706                     # Do not use os.path.join because it is sensitive
707                     # to string encoding, we just pass through dnames
708                     # as byte arrays
709                     d_full = "{0}/{1}".format(root_path, d.d_name)
710                     if d.is_dir():
711                         rmtree(d_full)
712                     else:
713                         self.fs.unlink(d_full)
714
715                 d = self.fs.readdir(dir_handle)
716             self.fs.closedir(dir_handle)
717
718             self.fs.rmdir(root_path)
719
720         rmtree(trashed_volume)
721
722         if data_isolated:
723             pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
724             osd_map = self._rados_command("osd dump", {})
725             pool_id = self._get_pool_id(osd_map, pool_name)
726             mds_map = self._rados_command("mds dump", {})
727             if pool_id in mds_map['data_pools']:
728                 self._rados_command("mds remove_data_pool", {
729                     'pool': pool_name
730                 })
731             self._rados_command("osd pool delete",
732                                 {
733                                     "pool": pool_name,
734                                     "pool2": pool_name,
735                                     "sure": "--yes-i-really-really-mean-it"
736                                 })
737
738     def _get_ancestor_xattr(self, path, attr):
739         """
740         Helper for reading layout information: if this xattr is missing
741         on the requested path, keep checking parents until we find it.
742         """
743         try:
744             result = self.fs.getxattr(path, attr)
745             if result == "":
746                 # Annoying!  cephfs gives us empty instead of an error when attr not found
747                 raise cephfs.NoData()
748             else:
749                 return result
750         except cephfs.NoData:
751             if path == "/":
752                 raise
753             else:
754                 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
755
756     def _check_compat_version(self, compat_version):
757         if self.version < compat_version:
758             msg = ("The current version of CephFSVolumeClient, version {0} "
759                    "does not support the required feature. Need version {1} "
760                    "or greater".format(self.version, compat_version)
761                   )
762             log.error(msg)
763             raise CephFSVolumeClientError(msg)
764
765     def _metadata_get(self, path):
766         """
767         Return a deserialized JSON object, or None
768         """
769         fd = self.fs.open(path, "r")
770         # TODO iterate instead of assuming file < 4MB
771         read_bytes = self.fs.read(fd, 0, 4096 * 1024)
772         self.fs.close(fd)
773         if read_bytes:
774             return json.loads(read_bytes)
775         else:
776             return None
777
778     def _metadata_set(self, path, data):
779         serialized = json.dumps(data)
780         fd = self.fs.open(path, "w")
781         try:
782             self.fs.write(fd, serialized, 0)
783             self.fs.fsync(fd, 0)
784         finally:
785             self.fs.close(fd)
786
787     def _lock(self, path):
788         @contextmanager
789         def fn():
790             while(1):
791                 fd = self.fs.open(path, os.O_CREAT, 0o755)
792                 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
793
794                 # The locked file will be cleaned up sometime. It could be
795                 # unlinked e.g., by an another manila share instance, before
796                 # lock was applied on it. Perform checks to ensure that this
797                 # does not happen.
798                 try:
799                     statbuf = self.fs.stat(path)
800                 except cephfs.ObjectNotFound:
801                     self.fs.close(fd)
802                     continue
803
804                 fstatbuf = self.fs.fstat(fd)
805                 if statbuf.st_ino == fstatbuf.st_ino:
806                     break
807
808             try:
809                 yield
810             finally:
811                 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
812                 self.fs.close(fd)
813
814         return fn()
815
816     def _auth_metadata_path(self, auth_id):
817         return os.path.join(self.volume_prefix, "${0}{1}".format(
818             auth_id, META_FILE_EXT))
819
820     def _auth_lock(self, auth_id):
821         return self._lock(self._auth_metadata_path(auth_id))
822
823     def _auth_metadata_get(self, auth_id):
824         """
825         Call me with the metadata locked!
826
827         Check whether a auth metadata structure can be decoded by the current
828         version of CephFSVolumeClient.
829
830         Return auth metadata that the current version of CephFSVolumeClient
831         can decode.
832         """
833         auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
834
835         if auth_metadata:
836             self._check_compat_version(auth_metadata['compat_version'])
837
838         return auth_metadata
839
840     def _auth_metadata_set(self, auth_id, data):
841         """
842         Call me with the metadata locked!
843
844         Fsync the auth metadata.
845
846         Add two version attributes to the auth metadata,
847         'compat_version', the minimum CephFSVolumeClient version that can
848         decode the metadata, and 'version', the CephFSVolumeClient version
849         that encoded the metadata.
850         """
851         data['compat_version'] = 1
852         data['version'] = self.version
853         return self._metadata_set(self._auth_metadata_path(auth_id), data)
854
855     def _volume_metadata_path(self, volume_path):
856         return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
857             volume_path.group_id if volume_path.group_id else "",
858             volume_path.volume_id,
859             META_FILE_EXT
860         ))
861
862     def _volume_lock(self, volume_path):
863         """
864         Return a ContextManager which locks the authorization metadata for
865         a particular volume, and persists a flag to the metadata indicating
866         that it is currently locked, so that we can detect dirty situations
867         during recovery.
868
869         This lock isn't just to make access to the metadata safe: it's also
870         designed to be used over the two-step process of checking the
871         metadata and then responding to an authorization request, to
872         ensure that at the point we respond the metadata hasn't changed
873         in the background.  It's key to how we avoid security holes
874         resulting from races during that problem ,
875         """
876         return self._lock(self._volume_metadata_path(volume_path))
877
878     def _volume_metadata_get(self, volume_path):
879         """
880         Call me with the metadata locked!
881
882         Check whether a volume metadata structure can be decoded by the current
883         version of CephFSVolumeClient.
884
885         Return a volume_metadata structure that the current version of
886         CephFSVolumeClient can decode.
887         """
888         volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
889
890         if volume_metadata:
891             self._check_compat_version(volume_metadata['compat_version'])
892
893         return volume_metadata
894
895     def _volume_metadata_set(self, volume_path, data):
896         """
897         Call me with the metadata locked!
898
899         Add two version attributes to the volume metadata,
900         'compat_version', the minimum CephFSVolumeClient version that can
901         decode the metadata and 'version', the CephFSVolumeClient version
902         that encoded the metadata.
903         """
904         data['compat_version'] = 1
905         data['version'] = self.version
906         return self._metadata_set(self._volume_metadata_path(volume_path), data)
907
908     def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
909         """
910         Get-or-create a Ceph auth identity for `auth_id` and grant them access
911         to
912         :param volume_path:
913         :param auth_id:
914         :param readonly:
915         :param tenant_id: Optionally provide a stringizable object to
916                           restrict any created cephx IDs to other callers
917                           passing the same tenant ID.
918         :return:
919         """
920
921         with self._auth_lock(auth_id):
922             # Existing meta, or None, to be updated
923             auth_meta = self._auth_metadata_get(auth_id)
924
925             # volume data to be inserted
926             volume_path_str = str(volume_path)
927             volume = {
928                 volume_path_str : {
929                     # The access level at which the auth_id is authorized to
930                     # access the volume.
931                     'access_level': 'r' if readonly else 'rw',
932                     'dirty': True,
933                 }
934             }
935             if auth_meta is None:
936                 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
937                     auth_id, tenant_id
938                 ))
939                 log.debug("Authorize: no existing meta")
940                 auth_meta = {
941                     'dirty': True,
942                     'tenant_id': tenant_id.__str__() if tenant_id else None,
943                     'volumes': volume
944                 }
945
946                 # Note: this is *not* guaranteeing that the key doesn't already
947                 # exist in Ceph: we are allowing VolumeClient tenants to
948                 # 'claim' existing Ceph keys.  In order to prevent VolumeClient
949                 # tenants from reading e.g. client.admin keys, you need to
950                 # have configured your VolumeClient user (e.g. Manila) to
951                 # have mon auth caps that prevent it from accessing those keys
952                 # (e.g. limit it to only access keys with a manila.* prefix)
953             else:
954                 # Disallow tenants to share auth IDs
955                 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
956                     msg = "auth ID: {0} is already in use".format(auth_id)
957                     log.error(msg)
958                     raise CephFSVolumeClientError(msg)
959
960                 if auth_meta['dirty']:
961                     self._recover_auth_meta(auth_id, auth_meta)
962
963                 log.debug("Authorize: existing tenant {tenant}".format(
964                     tenant=auth_meta['tenant_id']
965                 ))
966                 auth_meta['dirty'] = True
967                 auth_meta['volumes'].update(volume)
968
969             self._auth_metadata_set(auth_id, auth_meta)
970
971             with self._volume_lock(volume_path):
972                 key = self._authorize_volume(volume_path, auth_id, readonly)
973
974             auth_meta['dirty'] = False
975             auth_meta['volumes'][volume_path_str]['dirty'] = False
976             self._auth_metadata_set(auth_id, auth_meta)
977
978             if tenant_id:
979                 return {
980                     'auth_key': key
981                 }
982             else:
983                 # Caller wasn't multi-tenant aware: be safe and don't give
984                 # them a key
985                 return {
986                     'auth_key': None
987                 }
988
989     def _authorize_volume(self, volume_path, auth_id, readonly):
990         vol_meta = self._volume_metadata_get(volume_path)
991
992         access_level = 'r' if readonly else 'rw'
993         auth = {
994             auth_id: {
995                 'access_level': access_level,
996                 'dirty': True,
997             }
998         }
999
1000         if vol_meta is None:
1001             vol_meta = {
1002                 'auths': auth
1003             }
1004         else:
1005             vol_meta['auths'].update(auth)
1006             self._volume_metadata_set(volume_path, vol_meta)
1007
1008         key = self._authorize_ceph(volume_path, auth_id, readonly)
1009
1010         vol_meta['auths'][auth_id]['dirty'] = False
1011         self._volume_metadata_set(volume_path, vol_meta)
1012
1013         return key
1014
1015     def _authorize_ceph(self, volume_path, auth_id, readonly):
1016         path = self._get_path(volume_path)
1017         log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1018             auth_id, path
1019         ))
1020
1021         # First I need to work out what the data pool is for this share:
1022         # read the layout
1023         pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1024         namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1025
1026         # Now construct auth capabilities that give the guest just enough
1027         # permissions to access the share
1028         client_entity = "client.{0}".format(auth_id)
1029         want_access_level = 'r' if readonly else 'rw'
1030         want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1031         want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1032             want_access_level, pool_name, namespace)
1033
1034         try:
1035             existing = self._rados_command(
1036                 'auth get',
1037                 {
1038                     'entity': client_entity
1039                 }
1040             )
1041             # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1042         except rados.Error:
1043             caps = self._rados_command(
1044                 'auth get-or-create',
1045                 {
1046                     'entity': client_entity,
1047                     'caps': [
1048                         'mds', want_mds_cap,
1049                         'osd', want_osd_cap,
1050                         'mon', 'allow r']
1051                 })
1052         else:
1053             # entity exists, update it
1054             cap = existing[0]
1055
1056             # Construct auth caps that if present might conflict with the desired
1057             # auth caps.
1058             unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1059             unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1060             unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1061                 unwanted_access_level, pool_name, namespace)
1062
1063             def cap_update(orig, want, unwanted):
1064                 # Updates the existing auth caps such that there is a single
1065                 # occurrence of wanted auth caps and no occurrence of
1066                 # conflicting auth caps.
1067
1068                 if not orig:
1069                     return want
1070
1071                 cap_tokens = set(orig.split(","))
1072
1073                 cap_tokens.discard(unwanted)
1074                 cap_tokens.add(want)
1075
1076                 return ",".join(cap_tokens)
1077
1078             osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap)
1079             mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap)
1080
1081             caps = self._rados_command(
1082                 'auth caps',
1083                 {
1084                     'entity': client_entity,
1085                     'caps': [
1086                         'mds', mds_cap_str,
1087                         'osd', osd_cap_str,
1088                         'mon', cap['caps'].get('mon', 'allow r')]
1089                 })
1090             caps = self._rados_command(
1091                 'auth get',
1092                 {
1093                     'entity': client_entity
1094                 }
1095             )
1096
1097         # Result expected like this:
1098         # [
1099         #     {
1100         #         "entity": "client.foobar",
1101         #         "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1102         #         "caps": {
1103         #             "mds": "allow *",
1104         #             "mon": "allow *"
1105         #         }
1106         #     }
1107         # ]
1108         assert len(caps) == 1
1109         assert caps[0]['entity'] == client_entity
1110         return caps[0]['key']
1111
1112     def deauthorize(self, volume_path, auth_id):
1113         with self._auth_lock(auth_id):
1114             # Existing meta, or None, to be updated
1115             auth_meta = self._auth_metadata_get(auth_id)
1116
1117             volume_path_str = str(volume_path)
1118             if (auth_meta is None) or (not auth_meta['volumes']):
1119                 log.warn("deauthorized called for already-removed auth"
1120                          "ID '{auth_id}' for volume ID '{volume}'".format(
1121                     auth_id=auth_id, volume=volume_path.volume_id
1122                 ))
1123                 # Clean up the auth meta file of an auth ID
1124                 self.fs.unlink(self._auth_metadata_path(auth_id))
1125                 return
1126
1127             if volume_path_str not in auth_meta['volumes']:
1128                 log.warn("deauthorized called for already-removed auth"
1129                          "ID '{auth_id}' for volume ID '{volume}'".format(
1130                     auth_id=auth_id, volume=volume_path.volume_id
1131                 ))
1132                 return
1133
1134             if auth_meta['dirty']:
1135                 self._recover_auth_meta(auth_id, auth_meta)
1136
1137             auth_meta['dirty'] = True
1138             auth_meta['volumes'][volume_path_str]['dirty'] = True
1139             self._auth_metadata_set(auth_id, auth_meta)
1140
1141             self._deauthorize_volume(volume_path, auth_id)
1142
1143             # Filter out the volume we're deauthorizing
1144             del auth_meta['volumes'][volume_path_str]
1145
1146             # Clean up auth meta file
1147             if not auth_meta['volumes']:
1148                 self.fs.unlink(self._auth_metadata_path(auth_id))
1149                 return
1150
1151             auth_meta['dirty'] = False
1152             self._auth_metadata_set(auth_id, auth_meta)
1153
1154     def _deauthorize_volume(self, volume_path, auth_id):
1155         with self._volume_lock(volume_path):
1156             vol_meta = self._volume_metadata_get(volume_path)
1157
1158             if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1159                 log.warn("deauthorized called for already-removed auth"
1160                          "ID '{auth_id}' for volume ID '{volume}'".format(
1161                     auth_id=auth_id, volume=volume_path.volume_id
1162                 ))
1163                 return
1164
1165             vol_meta['auths'][auth_id]['dirty'] = True
1166             self._volume_metadata_set(volume_path, vol_meta)
1167
1168             self._deauthorize(volume_path, auth_id)
1169
1170             # Remove the auth_id from the metadata *after* removing it
1171             # from ceph, so that if we crashed here, we would actually
1172             # recreate the auth ID during recovery (i.e. end up with
1173             # a consistent state).
1174
1175             # Filter out the auth we're removing
1176             del vol_meta['auths'][auth_id]
1177             self._volume_metadata_set(volume_path, vol_meta)
1178
1179     def _deauthorize(self, volume_path, auth_id):
1180         """
1181         The volume must still exist.
1182         """
1183         client_entity = "client.{0}".format(auth_id)
1184         path = self._get_path(volume_path)
1185         pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1186         namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1187
1188         # The auth_id might have read-only or read-write mount access for the
1189         # volume path.
1190         access_levels = ('r', 'rw')
1191         want_mds_caps = {'allow {0} path={1}'.format(access_level, path)
1192                          for access_level in access_levels}
1193         want_osd_caps = {'allow {0} pool={1} namespace={2}'.format(
1194                          access_level, pool_name, namespace)
1195                          for access_level in access_levels}
1196
1197         try:
1198             existing = self._rados_command(
1199                 'auth get',
1200                 {
1201                     'entity': client_entity
1202                 }
1203             )
1204
1205             def cap_remove(orig, want):
1206                 cap_tokens = set(orig.split(","))
1207                 return ",".join(cap_tokens.difference(want))
1208
1209             cap = existing[0]
1210             osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps)
1211             mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps)
1212             if (not osd_cap_str) and (not mds_cap_str):
1213                 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1214             else:
1215                 self._rados_command(
1216                     'auth caps',
1217                     {
1218                         'entity': client_entity,
1219                         'caps': [
1220                             'mds', mds_cap_str,
1221                             'osd', osd_cap_str,
1222                             'mon', cap['caps'].get('mon', 'allow r')]
1223                     })
1224
1225         # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1226         except rados.Error:
1227             # Already gone, great.
1228             return
1229
1230     def get_authorized_ids(self, volume_path):
1231         """
1232         Expose a list of auth IDs that have access to a volume.
1233
1234         return: a list of (auth_id, access_level) tuples, where
1235                 the access_level can be 'r' , or 'rw'.
1236                 None if no auth ID is given access to the volume.
1237         """
1238         with self._volume_lock(volume_path):
1239             meta = self._volume_metadata_get(volume_path)
1240             auths = []
1241             if not meta or not meta['auths']:
1242                 return None
1243
1244             for auth, auth_data in meta['auths'].items():
1245                 # Skip partial auth updates.
1246                 if not auth_data['dirty']:
1247                     auths.append((auth, auth_data['access_level']))
1248
1249             return auths
1250
1251     def _rados_command(self, prefix, args=None, decode=True):
1252         """
1253         Safer wrapper for ceph_argparse.json_command, which raises
1254         Error exception instead of relying on caller to check return
1255         codes.
1256
1257         Error exception can result from:
1258         * Timeout
1259         * Actual legitimate errors
1260         * Malformed JSON output
1261
1262         return: Decoded object from ceph, or None if empty string returned.
1263                 If decode is False, return a string (the data returned by
1264                 ceph command)
1265         """
1266         if args is None:
1267             args = {}
1268
1269         argdict = args.copy()
1270         argdict['format'] = 'json'
1271
1272         ret, outbuf, outs = json_command(self.rados,
1273                                          prefix=prefix,
1274                                          argdict=argdict,
1275                                          timeout=RADOS_TIMEOUT)
1276         if ret != 0:
1277             raise rados.Error(outs)
1278         else:
1279             if decode:
1280                 if outbuf:
1281                     try:
1282                         return json.loads(outbuf)
1283                     except (ValueError, TypeError):
1284                         raise RadosError("Invalid JSON output for command {0}".format(argdict))
1285                 else:
1286                     return None
1287             else:
1288                 return outbuf
1289
1290     def get_used_bytes(self, volume_path):
1291         return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes"))
1292
1293     def set_max_bytes(self, volume_path, max_bytes):
1294         self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1295                          max_bytes.__str__() if max_bytes is not None else "0",
1296                          0)
1297
1298     def _snapshot_path(self, dir_path, snapshot_name):
1299         return os.path.join(
1300             dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1301         )
1302
1303     def _snapshot_create(self, dir_path, snapshot_name):
1304         # TODO: raise intelligible exception for clusters where snaps are disabled
1305         self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
1306
1307     def _snapshot_destroy(self, dir_path, snapshot_name):
1308         """
1309         Remove a snapshot, or do nothing if it already doesn't exist.
1310         """
1311         try:
1312             self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1313         except cephfs.ObjectNotFound:
1314             log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1315
1316     def create_snapshot_volume(self, volume_path, snapshot_name):
1317         self._snapshot_create(self._get_path(volume_path), snapshot_name)
1318
1319     def destroy_snapshot_volume(self, volume_path, snapshot_name):
1320         self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1321
1322     def create_snapshot_group(self, group_id, snapshot_name):
1323         if group_id is None:
1324             raise RuntimeError("Group ID may not be None")
1325
1326         return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
1327
1328     def destroy_snapshot_group(self, group_id, snapshot_name):
1329         if group_id is None:
1330             raise RuntimeError("Group ID may not be None")
1331         if snapshot_name is None:
1332             raise RuntimeError("Snapshot name may not be None")
1333
1334         return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1335
1336     def _cp_r(self, src, dst):
1337         # TODO
1338         raise NotImplementedError()
1339
1340     def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1341         dest_fs_path = self._get_path(dest_volume_path)
1342         src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1343
1344         self._cp_r(src_snapshot_path, dest_fs_path)
1345
1346     def put_object(self, pool_name, object_name, data):
1347         """
1348         Synchronously write data to an object.
1349
1350         :param pool_name: name of the pool
1351         :type pool_name: str
1352         :param object_name: name of the object
1353         :type object_name: str
1354         :param data: data to write
1355         :type data: bytes
1356         """
1357         ioctx = self.rados.open_ioctx(pool_name)
1358         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1359         if len(data) > max_size:
1360             msg = ("Data to be written to object '{0}' exceeds "
1361                    "{1} bytes".format(object_name, max_size))
1362             log.error(msg)
1363             raise CephFSVolumeClientError(msg)
1364         try:
1365             ioctx.write_full(object_name, data)
1366         finally:
1367             ioctx.close()
1368
1369     def get_object(self, pool_name, object_name):
1370         """
1371         Synchronously read data from object.
1372
1373         :param pool_name: name of the pool
1374         :type pool_name: str
1375         :param object_name: name of the object
1376         :type object_name: str
1377
1378         :returns: bytes - data read from object
1379         """
1380         ioctx = self.rados.open_ioctx(pool_name)
1381         max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1382         try:
1383             bytes_read = ioctx.read(object_name, max_size)
1384             if ((len(bytes_read) == max_size) and
1385                     (ioctx.read(object_name, 1, offset=max_size))):
1386                 log.warning("Size of object {0} exceeds '{1}' bytes "
1387                             "read".format(object_name, max_size))
1388         finally:
1389             ioctx.close()
1390         return bytes_read
1391
1392     def delete_object(self, pool_name, object_name):
1393         ioctx = self.rados.open_ioctx(pool_name)
1394         try:
1395             ioctx.remove_object(object_name)
1396         except rados.ObjectNotFound:
1397             log.warn("Object '{0}' was already removed".format(object_name))
1398         finally:
1399             ioctx.close()