Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / mgr / restful / module.py
1 """
2 A RESTful API for Ceph
3 """
4
5 import os
6 import json
7 import time
8 import errno
9 import inspect
10 import tempfile
11 import threading
12 import traceback
13 import socket
14
15 import common
16
17 from uuid import uuid4
18 from pecan import jsonify, make_app
19 from OpenSSL import crypto
20 from pecan.rest import RestController
21 from werkzeug.serving import make_server, make_ssl_devcert
22
23 from hooks import ErrorHook
24 from mgr_module import MgrModule, CommandResult
25
26 # Global instance to share
27 instance = None
28
29
30 class CannotServe(Exception):
31     pass
32
33
34 class CommandsRequest(object):
35     """
36     This class handles parallel as well as sequential execution of
37     commands. The class accept a list of iterables that should be
38     executed sequentially. Each iterable can contain several commands
39     that can be executed in parallel.
40
41     Example:
42     [[c1,c2],[c3,c4]]
43      - run c1 and c2 in parallel
44      - wait for them to finish
45      - run c3 and c4 in parallel
46      - wait for them to finish
47     """
48
49
50     def __init__(self, commands_arrays):
51         self.id = str(id(self))
52
53         # Filter out empty sub-requests
54         commands_arrays = filter(
55             lambda x: len(x) != 0,
56             commands_arrays,
57         )
58
59         self.running = []
60         self.waiting = commands_arrays[1:]
61         self.finished = []
62         self.failed = []
63
64         self.lock = threading.RLock()
65         if not len(commands_arrays):
66             # Nothing to run
67             return
68
69         # Process first iteration of commands_arrays in parallel
70         results = self.run(commands_arrays[0])
71
72         self.running.extend(results)
73
74
75     def run(self, commands):
76         """
77         A static method that will execute the given list of commands in
78         parallel and will return the list of command results.
79         """
80
81         # Gather the results (in parallel)
82         results = []
83         for index in range(len(commands)):
84             tag = '%s:%d' % (str(self.id), index)
85
86             # Store the result
87             result = CommandResult(tag)
88             result.command = common.humanify_command(commands[index])
89             results.append(result)
90
91             # Run the command
92             instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
93
94         return results
95
96
97     def next(self):
98         with self.lock:
99             if not self.waiting:
100                 # Nothing to run
101                 return
102
103             # Run a next iteration of commands
104             commands = self.waiting[0]
105             self.waiting = self.waiting[1:]
106
107             self.running.extend(self.run(commands))
108
109
110     def finish(self, tag):
111         with self.lock:
112             for index in range(len(self.running)):
113                 if self.running[index].tag == tag:
114                     if self.running[index].r == 0:
115                         self.finished.append(self.running.pop(index))
116                     else:
117                         self.failed.append(self.running.pop(index))
118                     return True
119
120             # No such tag found
121             return False
122
123
124     def is_running(self, tag):
125         for result in self.running:
126             if result.tag == tag:
127                 return True
128         return False
129
130
131     def is_ready(self):
132         with self.lock:
133             return not self.running and self.waiting
134
135
136     def is_waiting(self):
137         return bool(self.waiting)
138
139
140     def is_finished(self):
141         with self.lock:
142             return not self.running and not self.waiting
143
144
145     def has_failed(self):
146         return bool(self.failed)
147
148
149     def get_state(self):
150         with self.lock:
151             if not self.is_finished():
152                 return "pending"
153
154             if self.has_failed():
155                 return "failed"
156
157             return "success"
158
159
160     def __json__(self):
161         return {
162             'id': self.id,
163             'running': map(
164                 lambda x: {
165                     'command': x.command,
166                     'outs': x.outs,
167                     'outb': x.outb,
168                 },
169                 self.running
170             ),
171             'finished': map(
172                 lambda x: {
173                     'command': x.command,
174                     'outs': x.outs,
175                     'outb': x.outb,
176                 },
177                 self.finished
178             ),
179             'waiting': map(
180                 lambda x: {
181                     'command': x.command,
182                     'outs': x.outs,
183                     'outb': x.outb,
184                 },
185                 self.waiting
186             ),
187             'failed': map(
188                 lambda x: {
189                     'command': x.command,
190                     'outs': x.outs,
191                     'outb': x.outb,
192                 },
193                 self.failed
194             ),
195             'is_waiting': self.is_waiting(),
196             'is_finished': self.is_finished(),
197             'has_failed': self.has_failed(),
198             'state': self.get_state(),
199         }
200
201
202
203 class Module(MgrModule):
204     COMMANDS = [
205         {
206             "cmd": "restful create-key name=key_name,type=CephString",
207             "desc": "Create an API key with this name",
208             "perm": "rw"
209         },
210         {
211             "cmd": "restful delete-key name=key_name,type=CephString",
212             "desc": "Delete an API key with this name",
213             "perm": "rw"
214         },
215         {
216             "cmd": "restful list-keys",
217             "desc": "List all API keys",
218             "perm": "rw"
219         },
220         {
221             "cmd": "restful create-self-signed-cert",
222             "desc": "Create localized self signed certificate",
223             "perm": "rw"
224         },
225         {
226             "cmd": "restful restart",
227             "desc": "Restart API server",
228             "perm": "rw"
229         },
230     ]
231
232     def __init__(self, *args, **kwargs):
233         super(Module, self).__init__(*args, **kwargs)
234         global instance
235         instance = self
236
237         self.requests = []
238         self.requests_lock = threading.RLock()
239
240         self.keys = {}
241         self.disable_auth = False
242
243         self.server = None
244
245         self.stop_server = False
246         self.serve_event = threading.Event()
247
248
249     def serve(self):
250         while not self.stop_server:
251             try:
252                 self._serve()
253                 self.server.socket.close()
254             except CannotServe as cs:
255                 self.log.warn("server not running: {0}".format(cs.message))
256             except:
257                 self.log.error(str(traceback.format_exc()))
258
259             # Wait and clear the threading event
260             self.serve_event.wait()
261             self.serve_event.clear()
262
263     def refresh_keys(self):
264         self.keys = {}
265         rawkeys = self.get_config_prefix('keys/') or {}
266         for k, v in rawkeys.iteritems():
267             self.keys[k[5:]] = v  # strip of keys/ prefix
268
269     def _serve(self):
270         # Load stored authentication keys
271         self.refresh_keys()
272
273         jsonify._instance = jsonify.GenericJSON(
274             sort_keys=True,
275             indent=4,
276             separators=(',', ': '),
277         )
278
279         server_addr = self.get_localized_config('server_addr', '::')
280         if server_addr is None:
281             raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
282
283         server_port = int(self.get_localized_config('server_port', '8003'))
284         self.log.info('server_addr: %s server_port: %d',
285                       server_addr, server_port)
286
287         cert = self.get_localized_config("crt")
288         if cert is not None:
289             cert_tmp = tempfile.NamedTemporaryFile()
290             cert_tmp.write(cert)
291             cert_tmp.flush()
292             cert_fname = cert_tmp.name
293         else:
294             cert_fname = self.get_localized_config('crt_file')
295
296         pkey = self.get_localized_config("key")
297         if pkey is not None:
298             pkey_tmp = tempfile.NamedTemporaryFile()
299             pkey_tmp.write(pkey)
300             pkey_tmp.flush()
301             pkey_fname = pkey_tmp.name
302         else:
303             pkey_fname = self.get_localized_config('key_file')
304
305         if not cert_fname or not pkey_fname:
306             raise CannotServe('no certificate configured')
307         if not os.path.isfile(cert_fname):
308             raise CannotServe('certificate %s does not exist' % cert_fname)
309         if not os.path.isfile(pkey_fname):
310             raise CannotServe('private key %s does not exist' % pkey_fname)
311
312         # Publish the URI that others may use to access the service we're
313         # about to start serving
314         self.set_uri("https://{0}:{1}/".format(
315             socket.gethostname() if server_addr == "::" else server_addr,
316             server_port
317         ))
318
319         # Create the HTTPS werkzeug server serving pecan app
320         self.server = make_server(
321             host=server_addr,
322             port=server_port,
323             app=make_app(
324                 root='restful.api.Root',
325                 hooks = [ErrorHook()],  # use a callable if pecan >= 0.3.2
326             ),
327             ssl_context=(cert_fname, pkey_fname),
328         )
329
330         self.server.serve_forever()
331
332
333     def shutdown(self):
334         try:
335             self.stop_server = True
336             if self.server:
337                 self.server.shutdown()
338             self.serve_event.set()
339         except:
340             self.log.error(str(traceback.format_exc()))
341             raise
342
343
344     def restart(self):
345         try:
346             if self.server:
347                 self.server.shutdown()
348             self.serve_event.set()
349         except:
350             self.log.error(str(traceback.format_exc()))
351
352
353     def notify(self, notify_type, tag):
354         try:
355             self._notify(notify_type, tag)
356         except:
357             self.log.error(str(traceback.format_exc()))
358
359
360     def _notify(self, notify_type, tag):
361         if notify_type == "command":
362             # we can safely skip all the sequential commands
363             if tag == 'seq':
364                 return
365
366             request = filter(
367                 lambda x: x.is_running(tag),
368                 self.requests)
369
370             if len(request) != 1:
371                 self.log.warn("Unknown request '%s'" % str(tag))
372                 return
373
374             request = request[0]
375             request.finish(tag)
376             if request.is_ready():
377                 request.next()
378         else:
379             self.log.debug("Unhandled notification type '%s'" % notify_type)
380
381
382     def create_self_signed_cert(self):
383         # create a key pair
384         pkey = crypto.PKey()
385         pkey.generate_key(crypto.TYPE_RSA, 2048)
386
387         # create a self-signed cert
388         cert = crypto.X509()
389         cert.get_subject().O = "IT"
390         cert.get_subject().CN = "ceph-restful"
391         cert.set_serial_number(int(uuid4()))
392         cert.gmtime_adj_notBefore(0)
393         cert.gmtime_adj_notAfter(10*365*24*60*60)
394         cert.set_issuer(cert.get_subject())
395         cert.set_pubkey(pkey)
396         cert.sign(pkey, 'sha512')
397
398         return (
399             crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
400             crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
401         )
402
403
404     def handle_command(self, command):
405         self.log.warn("Handling command: '%s'" % str(command))
406         if command['prefix'] == "restful create-key":
407             if command['key_name'] in self.keys:
408                 return 0, self.keys[command['key_name']], ""
409
410             else:
411                 key = str(uuid4())
412                 self.keys[command['key_name']] = key
413                 self.set_config('keys/' + command['key_name'], key)
414
415             return (
416                 0,
417                 self.keys[command['key_name']],
418                 "",
419             )
420
421         elif command['prefix'] == "restful delete-key":
422             if command['key_name'] in self.keys:
423                 del self.keys[command['key_name']]
424                 self.set_config('keys/' + command['key_name'], None)
425
426             return (
427                 0,
428                 "",
429                 "",
430             )
431
432         elif command['prefix'] == "restful list-keys":
433             self.refresh_keys()
434             return (
435                 0,
436                 json.dumps(self.keys, indent=2),
437                 "",
438             )
439
440         elif command['prefix'] == "restful create-self-signed-cert":
441             cert, pkey = self.create_self_signed_cert()
442
443             self.set_config(self.get_mgr_id() + '/crt', cert)
444             self.set_config(self.get_mgr_id() + '/key', pkey)
445
446             self.restart()
447             return (
448                 0,
449                 "Restarting RESTful API server...",
450                 ""
451             )
452
453         elif command['prefix'] == 'restful restart':
454             self.restart();
455             return (
456                 0,
457                 "Restarting RESTful API server...",
458                 ""
459             )
460
461         else:
462             return (
463                 -errno.EINVAL,
464                 "",
465                 "Command not found '{0}'".format(command['prefix'])
466             )
467
468
469     def get_doc_api(self, root, prefix=''):
470         doc = {}
471         for _obj in dir(root):
472             obj = getattr(root, _obj)
473
474             if isinstance(obj, RestController):
475                 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
476
477         if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
478             doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
479
480         prefix = prefix or '/'
481
482         doc[prefix] = {}
483         for method in 'get', 'post', 'patch', 'delete':
484             if getattr(root, method, None):
485                 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
486
487         if len(doc[prefix]) == 0:
488             del doc[prefix]
489
490         return doc
491
492
493     def get_mons(self):
494         mon_map_mons = self.get('mon_map')['mons']
495         mon_status = json.loads(self.get('mon_status')['json'])
496
497         # Add more information
498         for mon in mon_map_mons:
499             mon['in_quorum'] = mon['rank'] in mon_status['quorum']
500             mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
501             mon['leader'] = mon['rank'] == mon_status['quorum'][0]
502
503         return mon_map_mons
504
505
506     def get_osd_pools(self):
507         osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
508         pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
509         crush_rules = self.get('osd_map_crush')['rules']
510
511         osds_by_pool = {}
512         for pool_id, pool in pools.items():
513             pool_osds = None
514             for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
515                 if rule['min_size'] <= pool['size'] <= rule['max_size']:
516                     pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
517
518             osds_by_pool[pool_id] = pool_osds
519
520         for pool_id in pools.keys():
521             for in_pool_id in osds_by_pool[pool_id]:
522                 osds[in_pool_id].append(pool_id)
523
524         return osds
525
526
527     def get_osds(self, pool_id=None, ids=None):
528         # Get data
529         osd_map = self.get('osd_map')
530         osd_metadata = self.get('osd_metadata')
531
532         # Update the data with the additional info from the osd map
533         osds = osd_map['osds']
534
535         # Filter by osd ids
536         if ids is not None:
537             osds = filter(
538                 lambda x: str(x['osd']) in ids,
539                 osds
540             )
541
542         # Get list of pools per osd node
543         pools_map = self.get_osd_pools()
544
545         # map osd IDs to reweight
546         reweight_map = dict([
547             (x.get('id'), x.get('reweight', None))
548             for x in self.get('osd_map_tree')['nodes']
549         ])
550
551         # Build OSD data objects
552         for osd in osds:
553             osd['pools'] = pools_map[osd['osd']]
554             osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
555
556             osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
557
558             if osd['up']:
559                 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
560             else:
561                 osd['valid_commands'] = []
562
563         # Filter by pool
564         if pool_id:
565             pool_id = int(pool_id)
566             osds = filter(
567                 lambda x: pool_id in x['pools'],
568                 osds
569             )
570
571         return osds
572
573
574     def get_osd_by_id(self, osd_id):
575         osd = filter(
576             lambda x: x['osd'] == osd_id,
577             self.get('osd_map')['osds']
578         )
579
580         if len(osd) != 1:
581             return None
582
583         return osd[0]
584
585
586     def get_pool_by_id(self, pool_id):
587         pool = filter(
588             lambda x: x['pool'] == pool_id,
589             self.get('osd_map')['pools'],
590         )
591
592         if len(pool) != 1:
593             return None
594
595         return pool[0]
596
597
598     def submit_request(self, _request, **kwargs):
599         request = CommandsRequest(_request)
600         with self.requests_lock:
601             self.requests.append(request)
602         if kwargs.get('wait', 0):
603             while not request.is_finished():
604                 time.sleep(0.001)
605         return request
606
607
608     def run_command(self, command):
609         # tag with 'seq' so that we can ingore these in notify function
610         result = CommandResult('seq')
611
612         self.send_command(result, 'mon', '', json.dumps(command), 'seq')
613         return result.wait()