17 from uuid import uuid4
18 from pecan import jsonify, make_app
19 from OpenSSL import crypto
20 from pecan.rest import RestController
21 from werkzeug.serving import make_server, make_ssl_devcert
23 from hooks import ErrorHook
24 from mgr_module import MgrModule, CommandResult
26 # Global instance to share
30 class CannotServe(Exception):
34 class CommandsRequest(object):
36 This class handles parallel as well as sequential execution of
37 commands. The class accept a list of iterables that should be
38 executed sequentially. Each iterable can contain several commands
39 that can be executed in parallel.
43 - run c1 and c2 in parallel
44 - wait for them to finish
45 - run c3 and c4 in parallel
46 - wait for them to finish
50 def __init__(self, commands_arrays):
51 self.id = str(id(self))
53 # Filter out empty sub-requests
54 commands_arrays = filter(
55 lambda x: len(x) != 0,
60 self.waiting = commands_arrays[1:]
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
72 self.running.extend(results)
75 def run(self, commands):
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
81 # Gather the results (in parallel)
83 for index in range(len(commands)):
84 tag = '%s:%d' % (str(self.id), index)
87 result = CommandResult(tag)
88 result.command = common.humanify_command(commands[index])
89 results.append(result)
92 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
107 self.running.extend(self.run(commands))
110 def finish(self, tag):
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
117 self.failed.append(self.running.pop(index))
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
133 return not self.running and self.waiting
136 def is_waiting(self):
137 return bool(self.waiting)
140 def is_finished(self):
142 return not self.running and not self.waiting
145 def has_failed(self):
146 return bool(self.failed)
151 if not self.is_finished():
154 if self.has_failed():
165 'command': x.command,
173 'command': x.command,
181 'command': x.command,
189 'command': x.command,
195 'is_waiting': self.is_waiting(),
196 'is_finished': self.is_finished(),
197 'has_failed': self.has_failed(),
198 'state': self.get_state(),
203 class Module(MgrModule):
206 "cmd": "restful create-key name=key_name,type=CephString",
207 "desc": "Create an API key with this name",
211 "cmd": "restful delete-key name=key_name,type=CephString",
212 "desc": "Delete an API key with this name",
216 "cmd": "restful list-keys",
217 "desc": "List all API keys",
221 "cmd": "restful create-self-signed-cert",
222 "desc": "Create localized self signed certificate",
226 "cmd": "restful restart",
227 "desc": "Restart API server",
232 def __init__(self, *args, **kwargs):
233 super(Module, self).__init__(*args, **kwargs)
238 self.requests_lock = threading.RLock()
241 self.disable_auth = False
245 self.stop_server = False
246 self.serve_event = threading.Event()
250 while not self.stop_server:
253 self.server.socket.close()
254 except CannotServe as cs:
255 self.log.warn("server not running: {0}".format(cs.message))
257 self.log.error(str(traceback.format_exc()))
259 # Wait and clear the threading event
260 self.serve_event.wait()
261 self.serve_event.clear()
263 def refresh_keys(self):
265 rawkeys = self.get_config_prefix('keys/') or {}
266 for k, v in rawkeys.iteritems():
267 self.keys[k[5:]] = v # strip of keys/ prefix
270 # Load stored authentication keys
273 jsonify._instance = jsonify.GenericJSON(
276 separators=(',', ': '),
279 server_addr = self.get_localized_config('server_addr', '::')
280 if server_addr is None:
281 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
283 server_port = int(self.get_localized_config('server_port', '8003'))
284 self.log.info('server_addr: %s server_port: %d',
285 server_addr, server_port)
287 cert = self.get_localized_config("crt")
289 cert_tmp = tempfile.NamedTemporaryFile()
292 cert_fname = cert_tmp.name
294 cert_fname = self.get_localized_config('crt_file')
296 pkey = self.get_localized_config("key")
298 pkey_tmp = tempfile.NamedTemporaryFile()
301 pkey_fname = pkey_tmp.name
303 pkey_fname = self.get_localized_config('key_file')
305 if not cert_fname or not pkey_fname:
306 raise CannotServe('no certificate configured')
307 if not os.path.isfile(cert_fname):
308 raise CannotServe('certificate %s does not exist' % cert_fname)
309 if not os.path.isfile(pkey_fname):
310 raise CannotServe('private key %s does not exist' % pkey_fname)
312 # Publish the URI that others may use to access the service we're
313 # about to start serving
314 self.set_uri("https://{0}:{1}/".format(
315 socket.gethostname() if server_addr == "::" else server_addr,
319 # Create the HTTPS werkzeug server serving pecan app
320 self.server = make_server(
324 root='restful.api.Root',
325 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
327 ssl_context=(cert_fname, pkey_fname),
330 self.server.serve_forever()
335 self.stop_server = True
337 self.server.shutdown()
338 self.serve_event.set()
340 self.log.error(str(traceback.format_exc()))
347 self.server.shutdown()
348 self.serve_event.set()
350 self.log.error(str(traceback.format_exc()))
353 def notify(self, notify_type, tag):
355 self._notify(notify_type, tag)
357 self.log.error(str(traceback.format_exc()))
360 def _notify(self, notify_type, tag):
361 if notify_type == "command":
362 # we can safely skip all the sequential commands
367 lambda x: x.is_running(tag),
370 if len(request) != 1:
371 self.log.warn("Unknown request '%s'" % str(tag))
376 if request.is_ready():
379 self.log.debug("Unhandled notification type '%s'" % notify_type)
382 def create_self_signed_cert(self):
385 pkey.generate_key(crypto.TYPE_RSA, 2048)
387 # create a self-signed cert
389 cert.get_subject().O = "IT"
390 cert.get_subject().CN = "ceph-restful"
391 cert.set_serial_number(int(uuid4()))
392 cert.gmtime_adj_notBefore(0)
393 cert.gmtime_adj_notAfter(10*365*24*60*60)
394 cert.set_issuer(cert.get_subject())
395 cert.set_pubkey(pkey)
396 cert.sign(pkey, 'sha512')
399 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
400 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
404 def handle_command(self, command):
405 self.log.warn("Handling command: '%s'" % str(command))
406 if command['prefix'] == "restful create-key":
407 if command['key_name'] in self.keys:
408 return 0, self.keys[command['key_name']], ""
412 self.keys[command['key_name']] = key
413 self.set_config('keys/' + command['key_name'], key)
417 self.keys[command['key_name']],
421 elif command['prefix'] == "restful delete-key":
422 if command['key_name'] in self.keys:
423 del self.keys[command['key_name']]
424 self.set_config('keys/' + command['key_name'], None)
432 elif command['prefix'] == "restful list-keys":
436 json.dumps(self.keys, indent=2),
440 elif command['prefix'] == "restful create-self-signed-cert":
441 cert, pkey = self.create_self_signed_cert()
443 self.set_config(self.get_mgr_id() + '/crt', cert)
444 self.set_config(self.get_mgr_id() + '/key', pkey)
449 "Restarting RESTful API server...",
453 elif command['prefix'] == 'restful restart':
457 "Restarting RESTful API server...",
465 "Command not found '{0}'".format(command['prefix'])
469 def get_doc_api(self, root, prefix=''):
471 for _obj in dir(root):
472 obj = getattr(root, _obj)
474 if isinstance(obj, RestController):
475 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
477 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
478 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
480 prefix = prefix or '/'
483 for method in 'get', 'post', 'patch', 'delete':
484 if getattr(root, method, None):
485 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
487 if len(doc[prefix]) == 0:
494 mon_map_mons = self.get('mon_map')['mons']
495 mon_status = json.loads(self.get('mon_status')['json'])
497 # Add more information
498 for mon in mon_map_mons:
499 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
500 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
501 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
506 def get_osd_pools(self):
507 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
508 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
509 crush_rules = self.get('osd_map_crush')['rules']
512 for pool_id, pool in pools.items():
514 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
515 if rule['min_size'] <= pool['size'] <= rule['max_size']:
516 pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
518 osds_by_pool[pool_id] = pool_osds
520 for pool_id in pools.keys():
521 for in_pool_id in osds_by_pool[pool_id]:
522 osds[in_pool_id].append(pool_id)
527 def get_osds(self, pool_id=None, ids=None):
529 osd_map = self.get('osd_map')
530 osd_metadata = self.get('osd_metadata')
532 # Update the data with the additional info from the osd map
533 osds = osd_map['osds']
538 lambda x: str(x['osd']) in ids,
542 # Get list of pools per osd node
543 pools_map = self.get_osd_pools()
545 # map osd IDs to reweight
546 reweight_map = dict([
547 (x.get('id'), x.get('reweight', None))
548 for x in self.get('osd_map_tree')['nodes']
551 # Build OSD data objects
553 osd['pools'] = pools_map[osd['osd']]
554 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
556 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
559 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
561 osd['valid_commands'] = []
565 pool_id = int(pool_id)
567 lambda x: pool_id in x['pools'],
574 def get_osd_by_id(self, osd_id):
576 lambda x: x['osd'] == osd_id,
577 self.get('osd_map')['osds']
586 def get_pool_by_id(self, pool_id):
588 lambda x: x['pool'] == pool_id,
589 self.get('osd_map')['pools'],
598 def submit_request(self, _request, **kwargs):
599 request = CommandsRequest(_request)
600 with self.requests_lock:
601 self.requests.append(request)
602 if kwargs.get('wait', 0):
603 while not request.is_finished():
608 def run_command(self, command):
609 # tag with 'seq' so that we can ingore these in notify function
610 result = CommandResult('seq')
612 self.send_command(result, 'mon', '', json.dumps(command), 'seq')