X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Frestful%2Fmodule.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Frestful%2Fmodule.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=6ce610b881f4d06d98ae0e25fca3e84721dfeb47;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/pybind/mgr/restful/module.py b/src/ceph/src/pybind/mgr/restful/module.py deleted file mode 100644 index 6ce610b..0000000 --- a/src/ceph/src/pybind/mgr/restful/module.py +++ /dev/null @@ -1,613 +0,0 @@ -""" -A RESTful API for Ceph -""" - -import os -import json -import time -import errno -import inspect -import tempfile -import threading -import traceback -import socket - -import common - -from uuid import uuid4 -from pecan import jsonify, make_app -from OpenSSL import crypto -from pecan.rest import RestController -from werkzeug.serving import make_server, make_ssl_devcert - -from hooks import ErrorHook -from mgr_module import MgrModule, CommandResult - -# Global instance to share -instance = None - - -class CannotServe(Exception): - pass - - -class CommandsRequest(object): - """ - This class handles parallel as well as sequential execution of - commands. The class accept a list of iterables that should be - executed sequentially. Each iterable can contain several commands - that can be executed in parallel. - - Example: - [[c1,c2],[c3,c4]] - - run c1 and c2 in parallel - - wait for them to finish - - run c3 and c4 in parallel - - wait for them to finish - """ - - - def __init__(self, commands_arrays): - self.id = str(id(self)) - - # Filter out empty sub-requests - commands_arrays = filter( - lambda x: len(x) != 0, - commands_arrays, - ) - - self.running = [] - self.waiting = commands_arrays[1:] - self.finished = [] - self.failed = [] - - self.lock = threading.RLock() - if not len(commands_arrays): - # Nothing to run - return - - # Process first iteration of commands_arrays in parallel - results = self.run(commands_arrays[0]) - - self.running.extend(results) - - - def run(self, commands): - """ - A static method that will execute the given list of commands in - parallel and will return the list of command results. - """ - - # Gather the results (in parallel) - results = [] - for index in range(len(commands)): - tag = '%s:%d' % (str(self.id), index) - - # Store the result - result = CommandResult(tag) - result.command = common.humanify_command(commands[index]) - results.append(result) - - # Run the command - instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag) - - return results - - - def next(self): - with self.lock: - if not self.waiting: - # Nothing to run - return - - # Run a next iteration of commands - commands = self.waiting[0] - self.waiting = self.waiting[1:] - - self.running.extend(self.run(commands)) - - - def finish(self, tag): - with self.lock: - for index in range(len(self.running)): - if self.running[index].tag == tag: - if self.running[index].r == 0: - self.finished.append(self.running.pop(index)) - else: - self.failed.append(self.running.pop(index)) - return True - - # No such tag found - return False - - - def is_running(self, tag): - for result in self.running: - if result.tag == tag: - return True - return False - - - def is_ready(self): - with self.lock: - return not self.running and self.waiting - - - def is_waiting(self): - return bool(self.waiting) - - - def is_finished(self): - with self.lock: - return not self.running and not self.waiting - - - def has_failed(self): - return bool(self.failed) - - - def get_state(self): - with self.lock: - if not self.is_finished(): - return "pending" - - if self.has_failed(): - return "failed" - - return "success" - - - def __json__(self): - return { - 'id': self.id, - 'running': map( - lambda x: { - 'command': x.command, - 'outs': x.outs, - 'outb': x.outb, - }, - self.running - ), - 'finished': map( - lambda x: { - 'command': x.command, - 'outs': x.outs, - 'outb': x.outb, - }, - self.finished - ), - 'waiting': map( - lambda x: { - 'command': x.command, - 'outs': x.outs, - 'outb': x.outb, - }, - self.waiting - ), - 'failed': map( - lambda x: { - 'command': x.command, - 'outs': x.outs, - 'outb': x.outb, - }, - self.failed - ), - 'is_waiting': self.is_waiting(), - 'is_finished': self.is_finished(), - 'has_failed': self.has_failed(), - 'state': self.get_state(), - } - - - -class Module(MgrModule): - COMMANDS = [ - { - "cmd": "restful create-key name=key_name,type=CephString", - "desc": "Create an API key with this name", - "perm": "rw" - }, - { - "cmd": "restful delete-key name=key_name,type=CephString", - "desc": "Delete an API key with this name", - "perm": "rw" - }, - { - "cmd": "restful list-keys", - "desc": "List all API keys", - "perm": "rw" - }, - { - "cmd": "restful create-self-signed-cert", - "desc": "Create localized self signed certificate", - "perm": "rw" - }, - { - "cmd": "restful restart", - "desc": "Restart API server", - "perm": "rw" - }, - ] - - def __init__(self, *args, **kwargs): - super(Module, self).__init__(*args, **kwargs) - global instance - instance = self - - self.requests = [] - self.requests_lock = threading.RLock() - - self.keys = {} - self.disable_auth = False - - self.server = None - - self.stop_server = False - self.serve_event = threading.Event() - - - def serve(self): - while not self.stop_server: - try: - self._serve() - self.server.socket.close() - except CannotServe as cs: - self.log.warn("server not running: {0}".format(cs.message)) - except: - self.log.error(str(traceback.format_exc())) - - # Wait and clear the threading event - self.serve_event.wait() - self.serve_event.clear() - - def refresh_keys(self): - self.keys = {} - rawkeys = self.get_config_prefix('keys/') or {} - for k, v in rawkeys.iteritems(): - self.keys[k[5:]] = v # strip of keys/ prefix - - def _serve(self): - # Load stored authentication keys - self.refresh_keys() - - jsonify._instance = jsonify.GenericJSON( - sort_keys=True, - indent=4, - separators=(',', ': '), - ) - - server_addr = self.get_localized_config('server_addr', '::') - if server_addr is None: - raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr "') - - server_port = int(self.get_localized_config('server_port', '8003')) - self.log.info('server_addr: %s server_port: %d', - server_addr, server_port) - - cert = self.get_localized_config("crt") - if cert is not None: - cert_tmp = tempfile.NamedTemporaryFile() - cert_tmp.write(cert) - cert_tmp.flush() - cert_fname = cert_tmp.name - else: - cert_fname = self.get_localized_config('crt_file') - - pkey = self.get_localized_config("key") - if pkey is not None: - pkey_tmp = tempfile.NamedTemporaryFile() - pkey_tmp.write(pkey) - pkey_tmp.flush() - pkey_fname = pkey_tmp.name - else: - pkey_fname = self.get_localized_config('key_file') - - if not cert_fname or not pkey_fname: - raise CannotServe('no certificate configured') - if not os.path.isfile(cert_fname): - raise CannotServe('certificate %s does not exist' % cert_fname) - if not os.path.isfile(pkey_fname): - raise CannotServe('private key %s does not exist' % pkey_fname) - - # Publish the URI that others may use to access the service we're - # about to start serving - self.set_uri("https://{0}:{1}/".format( - socket.gethostname() if server_addr == "::" else server_addr, - server_port - )) - - # Create the HTTPS werkzeug server serving pecan app - self.server = make_server( - host=server_addr, - port=server_port, - app=make_app( - root='restful.api.Root', - hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2 - ), - ssl_context=(cert_fname, pkey_fname), - ) - - self.server.serve_forever() - - - def shutdown(self): - try: - self.stop_server = True - if self.server: - self.server.shutdown() - self.serve_event.set() - except: - self.log.error(str(traceback.format_exc())) - raise - - - def restart(self): - try: - if self.server: - self.server.shutdown() - self.serve_event.set() - except: - self.log.error(str(traceback.format_exc())) - - - def notify(self, notify_type, tag): - try: - self._notify(notify_type, tag) - except: - self.log.error(str(traceback.format_exc())) - - - def _notify(self, notify_type, tag): - if notify_type == "command": - # we can safely skip all the sequential commands - if tag == 'seq': - return - - request = filter( - lambda x: x.is_running(tag), - self.requests) - - if len(request) != 1: - self.log.warn("Unknown request '%s'" % str(tag)) - return - - request = request[0] - request.finish(tag) - if request.is_ready(): - request.next() - else: - self.log.debug("Unhandled notification type '%s'" % notify_type) - - - def create_self_signed_cert(self): - # create a key pair - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 2048) - - # create a self-signed cert - cert = crypto.X509() - cert.get_subject().O = "IT" - cert.get_subject().CN = "ceph-restful" - cert.set_serial_number(int(uuid4())) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(10*365*24*60*60) - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(pkey) - cert.sign(pkey, 'sha512') - - return ( - crypto.dump_certificate(crypto.FILETYPE_PEM, cert), - crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) - ) - - - def handle_command(self, command): - self.log.warn("Handling command: '%s'" % str(command)) - if command['prefix'] == "restful create-key": - if command['key_name'] in self.keys: - return 0, self.keys[command['key_name']], "" - - else: - key = str(uuid4()) - self.keys[command['key_name']] = key - self.set_config('keys/' + command['key_name'], key) - - return ( - 0, - self.keys[command['key_name']], - "", - ) - - elif command['prefix'] == "restful delete-key": - if command['key_name'] in self.keys: - del self.keys[command['key_name']] - self.set_config('keys/' + command['key_name'], None) - - return ( - 0, - "", - "", - ) - - elif command['prefix'] == "restful list-keys": - self.refresh_keys() - return ( - 0, - json.dumps(self.keys, indent=2), - "", - ) - - elif command['prefix'] == "restful create-self-signed-cert": - cert, pkey = self.create_self_signed_cert() - - self.set_config(self.get_mgr_id() + '/crt', cert) - self.set_config(self.get_mgr_id() + '/key', pkey) - - self.restart() - return ( - 0, - "Restarting RESTful API server...", - "" - ) - - elif command['prefix'] == 'restful restart': - self.restart(); - return ( - 0, - "Restarting RESTful API server...", - "" - ) - - else: - return ( - -errno.EINVAL, - "", - "Command not found '{0}'".format(command['prefix']) - ) - - - def get_doc_api(self, root, prefix=''): - doc = {} - for _obj in dir(root): - obj = getattr(root, _obj) - - if isinstance(obj, RestController): - doc.update(self.get_doc_api(obj, prefix + '/' + _obj)) - - if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController): - doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/')) - - prefix = prefix or '/' - - doc[prefix] = {} - for method in 'get', 'post', 'patch', 'delete': - if getattr(root, method, None): - doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n') - - if len(doc[prefix]) == 0: - del doc[prefix] - - return doc - - - def get_mons(self): - mon_map_mons = self.get('mon_map')['mons'] - mon_status = json.loads(self.get('mon_status')['json']) - - # Add more information - for mon in mon_map_mons: - mon['in_quorum'] = mon['rank'] in mon_status['quorum'] - mon['server'] = self.get_metadata("mon", mon['name'])['hostname'] - mon['leader'] = mon['rank'] == mon_status['quorum'][0] - - return mon_map_mons - - - def get_osd_pools(self): - osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds'])) - pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools'])) - crush_rules = self.get('osd_map_crush')['rules'] - - osds_by_pool = {} - for pool_id, pool in pools.items(): - pool_osds = None - for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]: - if rule['min_size'] <= pool['size'] <= rule['max_size']: - pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule) - - osds_by_pool[pool_id] = pool_osds - - for pool_id in pools.keys(): - for in_pool_id in osds_by_pool[pool_id]: - osds[in_pool_id].append(pool_id) - - return osds - - - def get_osds(self, pool_id=None, ids=None): - # Get data - osd_map = self.get('osd_map') - osd_metadata = self.get('osd_metadata') - - # Update the data with the additional info from the osd map - osds = osd_map['osds'] - - # Filter by osd ids - if ids is not None: - osds = filter( - lambda x: str(x['osd']) in ids, - osds - ) - - # Get list of pools per osd node - pools_map = self.get_osd_pools() - - # map osd IDs to reweight - reweight_map = dict([ - (x.get('id'), x.get('reweight', None)) - for x in self.get('osd_map_tree')['nodes'] - ]) - - # Build OSD data objects - for osd in osds: - osd['pools'] = pools_map[osd['osd']] - osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None) - - osd['reweight'] = reweight_map.get(osd['osd'], 0.0) - - if osd['up']: - osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS - else: - osd['valid_commands'] = [] - - # Filter by pool - if pool_id: - pool_id = int(pool_id) - osds = filter( - lambda x: pool_id in x['pools'], - osds - ) - - return osds - - - def get_osd_by_id(self, osd_id): - osd = filter( - lambda x: x['osd'] == osd_id, - self.get('osd_map')['osds'] - ) - - if len(osd) != 1: - return None - - return osd[0] - - - def get_pool_by_id(self, pool_id): - pool = filter( - lambda x: x['pool'] == pool_id, - self.get('osd_map')['pools'], - ) - - if len(pool) != 1: - return None - - return pool[0] - - - def submit_request(self, _request, **kwargs): - request = CommandsRequest(_request) - with self.requests_lock: - self.requests.append(request) - if kwargs.get('wait', 0): - while not request.is_finished(): - time.sleep(0.001) - return request - - - def run_command(self, command): - # tag with 'seq' so that we can ingore these in notify function - result = CommandResult('seq') - - self.send_command(result, 'mon', '', json.dumps(command), 'seq') - return result.wait()