--- /dev/null
+from mgr_module import MgrModule, CommandResult
+import json
+import threading
+
+class Module(MgrModule):
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.serve_event = threading.Event()
+
+ def notify(self, notify_type, notify_id):
+ if notify_type == 'osd_map':
+ self.handle_osd_map()
+
+ def handle_osd_map(self):
+ """
+ Check pools on each OSDMap change
+ """
+ subtree_type = self.get_config('subtree') or 'rack'
+ failure_domain = self.get_config('failure_domain') or 'host'
+ pg_num = self.get_config('pg_num') or '128'
+ num_rep = self.get_config('num_rep') or '3'
+ min_size = self.get_config('min_size')
+ prefix = self.get_config('prefix') or 'by-' + subtree_type + '-'
+
+ osdmap = self.get("osd_map")
+ lpools = []
+ for pool in osdmap['pools']:
+ if pool['pool_name'].find(prefix) == 0:
+ lpools.append(pool['pool_name'])
+
+ self.log.debug('localized pools = %s', lpools)
+ subtrees = []
+ tree = self.get('osd_map_tree')
+ for node in tree['nodes']:
+ if node['type'] == subtree_type:
+ subtrees.append(node['name'])
+ pool_name = prefix + node['name']
+ if pool_name not in lpools:
+ self.log.info('Creating localized pool %s', pool_name)
+ #
+ result = CommandResult("")
+ self.send_command(result, "mon", "", json.dumps({
+ "prefix": "osd crush rule create-replicated",
+ "format": "json",
+ "name": pool_name,
+ "root": node['name'],
+ "type": failure_domain,
+ }), "")
+ r, outb, outs = result.wait()
+
+ result = CommandResult("")
+ self.send_command(result, "mon", "", json.dumps({
+ "prefix": "osd pool create",
+ "format": "json",
+ "pool": pool_name,
+ 'rule': pool_name,
+ 'erasure_code_profile': pool_name,
+ "pool_type": 'replicated',
+ 'pg_num': str(pg_num),
+ }), "")
+ r, outb, outs = result.wait()
+
+ result = CommandResult("")
+ self.send_command(result, "mon", "", json.dumps({
+ "prefix": "osd pool set",
+ "format": "json",
+ "pool": pool_name,
+ 'var': 'size',
+ "val": str(num_rep),
+ }), "")
+ r, outb, outs = result.wait()
+
+ if min_size:
+ result = CommandResult("")
+ self.send_command(result, "mon", "", json.dumps({
+ "prefix": "osd pool set",
+ "format": "json",
+ "pool": pool_name,
+ 'var': 'min_size',
+ "val": str(min_size),
+ }), "")
+ r, outb, outs = result.wait()
+
+ # TODO remove pools for hosts that don't exist?
+
+ def serve(self):
+ self.handle_osd_map()
+ self.serve_event.wait()
+ self.serve_event.clear()
+
+ def shutdown(self):
+ self.serve_event.set()