X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Frestful%2Fmodule.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Frestful%2Fmodule.py;h=6ce610b881f4d06d98ae0e25fca3e84721dfeb47;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/pybind/mgr/restful/module.py b/src/ceph/src/pybind/mgr/restful/module.py new file mode 100644 index 0000000..6ce610b --- /dev/null +++ b/src/ceph/src/pybind/mgr/restful/module.py @@ -0,0 +1,613 @@ +""" +A RESTful API for Ceph +""" + +import os +import json +import time +import errno +import inspect +import tempfile +import threading +import traceback +import socket + +import common + +from uuid import uuid4 +from pecan import jsonify, make_app +from OpenSSL import crypto +from pecan.rest import RestController +from werkzeug.serving import make_server, make_ssl_devcert + +from hooks import ErrorHook +from mgr_module import MgrModule, CommandResult + +# Global instance to share +instance = None + + +class CannotServe(Exception): + pass + + +class CommandsRequest(object): + """ + This class handles parallel as well as sequential execution of + commands. The class accept a list of iterables that should be + executed sequentially. Each iterable can contain several commands + that can be executed in parallel. + + Example: + [[c1,c2],[c3,c4]] + - run c1 and c2 in parallel + - wait for them to finish + - run c3 and c4 in parallel + - wait for them to finish + """ + + + def __init__(self, commands_arrays): + self.id = str(id(self)) + + # Filter out empty sub-requests + commands_arrays = filter( + lambda x: len(x) != 0, + commands_arrays, + ) + + self.running = [] + self.waiting = commands_arrays[1:] + self.finished = [] + self.failed = [] + + self.lock = threading.RLock() + if not len(commands_arrays): + # Nothing to run + return + + # Process first iteration of commands_arrays in parallel + results = self.run(commands_arrays[0]) + + self.running.extend(results) + + + def run(self, commands): + """ + A static method that will execute the given list of commands in + parallel and will return the list of command results. + """ + + # Gather the results (in parallel) + results = [] + for index in range(len(commands)): + tag = '%s:%d' % (str(self.id), index) + + # Store the result + result = CommandResult(tag) + result.command = common.humanify_command(commands[index]) + results.append(result) + + # Run the command + instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag) + + return results + + + def next(self): + with self.lock: + if not self.waiting: + # Nothing to run + return + + # Run a next iteration of commands + commands = self.waiting[0] + self.waiting = self.waiting[1:] + + self.running.extend(self.run(commands)) + + + def finish(self, tag): + with self.lock: + for index in range(len(self.running)): + if self.running[index].tag == tag: + if self.running[index].r == 0: + self.finished.append(self.running.pop(index)) + else: + self.failed.append(self.running.pop(index)) + return True + + # No such tag found + return False + + + def is_running(self, tag): + for result in self.running: + if result.tag == tag: + return True + return False + + + def is_ready(self): + with self.lock: + return not self.running and self.waiting + + + def is_waiting(self): + return bool(self.waiting) + + + def is_finished(self): + with self.lock: + return not self.running and not self.waiting + + + def has_failed(self): + return bool(self.failed) + + + def get_state(self): + with self.lock: + if not self.is_finished(): + return "pending" + + if self.has_failed(): + return "failed" + + return "success" + + + def __json__(self): + return { + 'id': self.id, + 'running': map( + lambda x: { + 'command': x.command, + 'outs': x.outs, + 'outb': x.outb, + }, + self.running + ), + 'finished': map( + lambda x: { + 'command': x.command, + 'outs': x.outs, + 'outb': x.outb, + }, + self.finished + ), + 'waiting': map( + lambda x: { + 'command': x.command, + 'outs': x.outs, + 'outb': x.outb, + }, + self.waiting + ), + 'failed': map( + lambda x: { + 'command': x.command, + 'outs': x.outs, + 'outb': x.outb, + }, + self.failed + ), + 'is_waiting': self.is_waiting(), + 'is_finished': self.is_finished(), + 'has_failed': self.has_failed(), + 'state': self.get_state(), + } + + + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "restful create-key name=key_name,type=CephString", + "desc": "Create an API key with this name", + "perm": "rw" + }, + { + "cmd": "restful delete-key name=key_name,type=CephString", + "desc": "Delete an API key with this name", + "perm": "rw" + }, + { + "cmd": "restful list-keys", + "desc": "List all API keys", + "perm": "rw" + }, + { + "cmd": "restful create-self-signed-cert", + "desc": "Create localized self signed certificate", + "perm": "rw" + }, + { + "cmd": "restful restart", + "desc": "Restart API server", + "perm": "rw" + }, + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + global instance + instance = self + + self.requests = [] + self.requests_lock = threading.RLock() + + self.keys = {} + self.disable_auth = False + + self.server = None + + self.stop_server = False + self.serve_event = threading.Event() + + + def serve(self): + while not self.stop_server: + try: + self._serve() + self.server.socket.close() + except CannotServe as cs: + self.log.warn("server not running: {0}".format(cs.message)) + except: + self.log.error(str(traceback.format_exc())) + + # Wait and clear the threading event + self.serve_event.wait() + self.serve_event.clear() + + def refresh_keys(self): + self.keys = {} + rawkeys = self.get_config_prefix('keys/') or {} + for k, v in rawkeys.iteritems(): + self.keys[k[5:]] = v # strip of keys/ prefix + + def _serve(self): + # Load stored authentication keys + self.refresh_keys() + + jsonify._instance = jsonify.GenericJSON( + sort_keys=True, + indent=4, + separators=(',', ': '), + ) + + server_addr = self.get_localized_config('server_addr', '::') + if server_addr is None: + raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr "') + + server_port = int(self.get_localized_config('server_port', '8003')) + self.log.info('server_addr: %s server_port: %d', + server_addr, server_port) + + cert = self.get_localized_config("crt") + if cert is not None: + cert_tmp = tempfile.NamedTemporaryFile() + cert_tmp.write(cert) + cert_tmp.flush() + cert_fname = cert_tmp.name + else: + cert_fname = self.get_localized_config('crt_file') + + pkey = self.get_localized_config("key") + if pkey is not None: + pkey_tmp = tempfile.NamedTemporaryFile() + pkey_tmp.write(pkey) + pkey_tmp.flush() + pkey_fname = pkey_tmp.name + else: + pkey_fname = self.get_localized_config('key_file') + + if not cert_fname or not pkey_fname: + raise CannotServe('no certificate configured') + if not os.path.isfile(cert_fname): + raise CannotServe('certificate %s does not exist' % cert_fname) + if not os.path.isfile(pkey_fname): + raise CannotServe('private key %s does not exist' % pkey_fname) + + # Publish the URI that others may use to access the service we're + # about to start serving + self.set_uri("https://{0}:{1}/".format( + socket.gethostname() if server_addr == "::" else server_addr, + server_port + )) + + # Create the HTTPS werkzeug server serving pecan app + self.server = make_server( + host=server_addr, + port=server_port, + app=make_app( + root='restful.api.Root', + hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2 + ), + ssl_context=(cert_fname, pkey_fname), + ) + + self.server.serve_forever() + + + def shutdown(self): + try: + self.stop_server = True + if self.server: + self.server.shutdown() + self.serve_event.set() + except: + self.log.error(str(traceback.format_exc())) + raise + + + def restart(self): + try: + if self.server: + self.server.shutdown() + self.serve_event.set() + except: + self.log.error(str(traceback.format_exc())) + + + def notify(self, notify_type, tag): + try: + self._notify(notify_type, tag) + except: + self.log.error(str(traceback.format_exc())) + + + def _notify(self, notify_type, tag): + if notify_type == "command": + # we can safely skip all the sequential commands + if tag == 'seq': + return + + request = filter( + lambda x: x.is_running(tag), + self.requests) + + if len(request) != 1: + self.log.warn("Unknown request '%s'" % str(tag)) + return + + request = request[0] + request.finish(tag) + if request.is_ready(): + request.next() + else: + self.log.debug("Unhandled notification type '%s'" % notify_type) + + + def create_self_signed_cert(self): + # create a key pair + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, 2048) + + # create a self-signed cert + cert = crypto.X509() + cert.get_subject().O = "IT" + cert.get_subject().CN = "ceph-restful" + cert.set_serial_number(int(uuid4())) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10*365*24*60*60) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(pkey) + cert.sign(pkey, 'sha512') + + return ( + crypto.dump_certificate(crypto.FILETYPE_PEM, cert), + crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) + ) + + + def handle_command(self, command): + self.log.warn("Handling command: '%s'" % str(command)) + if command['prefix'] == "restful create-key": + if command['key_name'] in self.keys: + return 0, self.keys[command['key_name']], "" + + else: + key = str(uuid4()) + self.keys[command['key_name']] = key + self.set_config('keys/' + command['key_name'], key) + + return ( + 0, + self.keys[command['key_name']], + "", + ) + + elif command['prefix'] == "restful delete-key": + if command['key_name'] in self.keys: + del self.keys[command['key_name']] + self.set_config('keys/' + command['key_name'], None) + + return ( + 0, + "", + "", + ) + + elif command['prefix'] == "restful list-keys": + self.refresh_keys() + return ( + 0, + json.dumps(self.keys, indent=2), + "", + ) + + elif command['prefix'] == "restful create-self-signed-cert": + cert, pkey = self.create_self_signed_cert() + + self.set_config(self.get_mgr_id() + '/crt', cert) + self.set_config(self.get_mgr_id() + '/key', pkey) + + self.restart() + return ( + 0, + "Restarting RESTful API server...", + "" + ) + + elif command['prefix'] == 'restful restart': + self.restart(); + return ( + 0, + "Restarting RESTful API server...", + "" + ) + + else: + return ( + -errno.EINVAL, + "", + "Command not found '{0}'".format(command['prefix']) + ) + + + def get_doc_api(self, root, prefix=''): + doc = {} + for _obj in dir(root): + obj = getattr(root, _obj) + + if isinstance(obj, RestController): + doc.update(self.get_doc_api(obj, prefix + '/' + _obj)) + + if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController): + doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/')) + + prefix = prefix or '/' + + doc[prefix] = {} + for method in 'get', 'post', 'patch', 'delete': + if getattr(root, method, None): + doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n') + + if len(doc[prefix]) == 0: + del doc[prefix] + + return doc + + + def get_mons(self): + mon_map_mons = self.get('mon_map')['mons'] + mon_status = json.loads(self.get('mon_status')['json']) + + # Add more information + for mon in mon_map_mons: + mon['in_quorum'] = mon['rank'] in mon_status['quorum'] + mon['server'] = self.get_metadata("mon", mon['name'])['hostname'] + mon['leader'] = mon['rank'] == mon_status['quorum'][0] + + return mon_map_mons + + + def get_osd_pools(self): + osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds'])) + pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools'])) + crush_rules = self.get('osd_map_crush')['rules'] + + osds_by_pool = {} + for pool_id, pool in pools.items(): + pool_osds = None + for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]: + if rule['min_size'] <= pool['size'] <= rule['max_size']: + pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule) + + osds_by_pool[pool_id] = pool_osds + + for pool_id in pools.keys(): + for in_pool_id in osds_by_pool[pool_id]: + osds[in_pool_id].append(pool_id) + + return osds + + + def get_osds(self, pool_id=None, ids=None): + # Get data + osd_map = self.get('osd_map') + osd_metadata = self.get('osd_metadata') + + # Update the data with the additional info from the osd map + osds = osd_map['osds'] + + # Filter by osd ids + if ids is not None: + osds = filter( + lambda x: str(x['osd']) in ids, + osds + ) + + # Get list of pools per osd node + pools_map = self.get_osd_pools() + + # map osd IDs to reweight + reweight_map = dict([ + (x.get('id'), x.get('reweight', None)) + for x in self.get('osd_map_tree')['nodes'] + ]) + + # Build OSD data objects + for osd in osds: + osd['pools'] = pools_map[osd['osd']] + osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None) + + osd['reweight'] = reweight_map.get(osd['osd'], 0.0) + + if osd['up']: + osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS + else: + osd['valid_commands'] = [] + + # Filter by pool + if pool_id: + pool_id = int(pool_id) + osds = filter( + lambda x: pool_id in x['pools'], + osds + ) + + return osds + + + def get_osd_by_id(self, osd_id): + osd = filter( + lambda x: x['osd'] == osd_id, + self.get('osd_map')['osds'] + ) + + if len(osd) != 1: + return None + + return osd[0] + + + def get_pool_by_id(self, pool_id): + pool = filter( + lambda x: x['pool'] == pool_id, + self.get('osd_map')['pools'], + ) + + if len(pool) != 1: + return None + + return pool[0] + + + def submit_request(self, _request, **kwargs): + request = CommandsRequest(_request) + with self.requests_lock: + self.requests.append(request) + if kwargs.get('wait', 0): + while not request.is_finished(): + time.sleep(0.001) + return request + + + def run_command(self, command): + # tag with 'seq' so that we can ingore these in notify function + result = CommandResult('seq') + + self.send_command(result, 'mon', '', json.dumps(command), 'seq') + return result.wait()