remove ceph code
[stor4nfv.git] / src / ceph / src / pybind / mgr / balancer / module.py
diff --git a/src/ceph/src/pybind/mgr/balancer/module.py b/src/ceph/src/pybind/mgr/balancer/module.py
deleted file mode 100644 (file)
index b8cc087..0000000
+++ /dev/null
@@ -1,933 +0,0 @@
-
-"""
-Balance PG distribution across OSDs.
-"""
-
-import copy
-import errno
-import json
-import math
-import random
-import time
-from mgr_module import MgrModule, CommandResult
-from threading import Event
-
-# available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
-default_mode = 'none'
-default_sleep_interval = 60   # seconds
-default_max_misplaced = .05    # max ratio of pgs replaced at a time
-
-TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
-
-
-class MappingState:
-    def __init__(self, osdmap, pg_dump, desc=''):
-        self.desc = desc
-        self.osdmap = osdmap
-        self.osdmap_dump = self.osdmap.dump()
-        self.crush = osdmap.get_crush()
-        self.crush_dump = self.crush.dump()
-        self.pg_dump = pg_dump
-        self.pg_stat = {
-            i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
-        }
-        self.poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
-        self.pg_up = {}
-        self.pg_up_by_poolid = {}
-        for poolid in self.poolids:
-            self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
-            for a,b in self.pg_up_by_poolid[poolid].iteritems():
-                self.pg_up[a] = b
-
-    def calc_misplaced_from(self, other_ms):
-        num = len(other_ms.pg_up)
-        misplaced = 0
-        for pgid, before in other_ms.pg_up.iteritems():
-            if before != self.pg_up.get(pgid, []):
-                misplaced += 1
-        if num > 0:
-            return float(misplaced) / float(num)
-        return 0.0
-
-class Plan:
-    def __init__(self, name, ms):
-        self.mode = 'unknown'
-        self.name = name
-        self.initial = ms
-
-        self.osd_weights = {}
-        self.compat_ws = {}
-        self.inc = ms.osdmap.new_incremental()
-
-    def final_state(self):
-        self.inc.set_osd_reweights(self.osd_weights)
-        self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
-        return MappingState(self.initial.osdmap.apply_incremental(self.inc),
-                            self.initial.pg_dump,
-                            'plan %s final' % self.name)
-
-    def dump(self):
-        return json.dumps(self.inc.dump(), indent=4)
-
-    def show(self):
-        ls = []
-        ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
-        ls.append('# starting crush version %d' %
-                  self.initial.osdmap.get_crush_version())
-        ls.append('# mode %s' % self.mode)
-        if len(self.compat_ws) and \
-           '-1' not in self.initial.crush_dump.get('choose_args', {}):
-            ls.append('ceph osd crush weight-set create-compat')
-        for osd, weight in self.compat_ws.iteritems():
-            ls.append('ceph osd crush weight-set reweight-compat %s %f' %
-                      (osd, weight))
-        for osd, weight in self.osd_weights.iteritems():
-            ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
-        incdump = self.inc.dump()
-        for pgid in incdump.get('old_pg_upmap_items', []):
-            ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
-        for item in incdump.get('new_pg_upmap_items', []):
-            osdlist = []
-            for m in item['mappings']:
-                osdlist += [m['from'], m['to']]
-            ls.append('ceph osd pg-upmap-items %s %s' %
-                      (item['pgid'], ' '.join([str(a) for a in osdlist])))
-        return '\n'.join(ls)
-
-
-class Eval:
-    root_ids = {}        # root name -> id
-    pool_name = {}       # pool id -> pool name
-    pool_id = {}         # pool name -> id
-    pool_roots = {}      # pool name -> root name
-    root_pools = {}      # root name -> pools
-    target_by_root = {}  # root name -> target weight map
-    count_by_pool = {}
-    count_by_root = {}
-    actual_by_pool = {}  # pool -> by_* -> actual weight map
-    actual_by_root = {}  # pool -> by_* -> actual weight map
-    total_by_pool = {}   # pool -> by_* -> total
-    total_by_root = {}   # root -> by_* -> total
-    stats_by_pool = {}   # pool -> by_* -> stddev or avg -> value
-    stats_by_root = {}   # root -> by_* -> stddev or avg -> value
-
-    score_by_pool = {}
-    score_by_root = {}
-
-    score = 0.0
-
-    def __init__(self, ms):
-        self.ms = ms
-
-    def show(self, verbose=False):
-        if verbose:
-            r = self.ms.desc + '\n'
-            r += 'target_by_root %s\n' % self.target_by_root
-            r += 'actual_by_pool %s\n' % self.actual_by_pool
-            r += 'actual_by_root %s\n' % self.actual_by_root
-            r += 'count_by_pool %s\n' % self.count_by_pool
-            r += 'count_by_root %s\n' % self.count_by_root
-            r += 'total_by_pool %s\n' % self.total_by_pool
-            r += 'total_by_root %s\n' % self.total_by_root
-            r += 'stats_by_root %s\n' % self.stats_by_root
-            r += 'score_by_pool %s\n' % self.score_by_pool
-            r += 'score_by_root %s\n' % self.score_by_root
-        else:
-            r = self.ms.desc + ' '
-        r += 'score %f (lower is better)\n' % self.score
-        return r
-
-    def calc_stats(self, count, target, total):
-        num = max(len(target), 1)
-        r = {}
-        for t in ('pgs', 'objects', 'bytes'):
-            avg = float(total[t]) / float(num)
-            dev = 0.0
-
-            # score is a measure of how uneven the data distribution is.
-            # score lies between [0, 1), 0 means perfect distribution.
-            score = 0.0
-            sum_weight = 0.0
-
-            for k, v in count[t].iteritems():
-                # adjust/normalize by weight
-                if target[k]:
-                    adjusted = float(v) / target[k] / float(num)
-                else:
-                    adjusted = 0.0
-
-                # Overweighted devices and their weights are factors to calculate reweight_urgency.
-                # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
-                # situation than one 10% overfilled with 5 2% underfilled devices
-                if adjusted > avg:
-                    '''
-                    F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
-                    x = (adjusted - avg)/avg.
-                    Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
-                    To bring range of F(x) in range [0, 1), we need to make the above modification.
-
-                    In general, we need to use a function F(x), where x = (adjusted - avg)/avg
-                    1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
-                    2. A larger value of x, should imply more urgency to reweight.
-                    3. Also, the difference between F(x) when x is large, should be minimal.
-                    4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
-
-                    Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
-
-                    cdf of standard normal distribution: https://stackoverflow.com/a/29273201
-                    '''
-                    score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
-                    sum_weight += target[k]
-                dev += (avg - adjusted) * (avg - adjusted)
-            stddev = math.sqrt(dev / float(max(num - 1, 1)))
-            score = score / max(sum_weight, 1)
-            r[t] = {
-                'avg': avg,
-                'stddev': stddev,
-                'sum_weight': sum_weight,
-                'score': score,
-            }
-        return r
-
-class Module(MgrModule):
-    COMMANDS = [
-        {
-            "cmd": "balancer status",
-            "desc": "Show balancer status",
-            "perm": "r",
-        },
-        {
-            "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
-            "desc": "Set balancer mode",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer on",
-            "desc": "Enable automatic balancing",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer off",
-            "desc": "Disable automatic balancing",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer eval name=plan,type=CephString,req=false",
-            "desc": "Evaluate data distribution for the current cluster or specific plan",
-            "perm": "r",
-        },
-        {
-            "cmd": "balancer eval-verbose name=plan,type=CephString,req=false",
-            "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)",
-            "perm": "r",
-        },
-        {
-            "cmd": "balancer optimize name=plan,type=CephString",
-            "desc": "Run optimizer to create a new plan",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer show name=plan,type=CephString",
-            "desc": "Show details of an optimization plan",
-            "perm": "r",
-        },
-        {
-            "cmd": "balancer rm name=plan,type=CephString",
-            "desc": "Discard an optimization plan",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer reset",
-            "desc": "Discard all optimization plans",
-            "perm": "rw",
-        },
-        {
-            "cmd": "balancer dump name=plan,type=CephString",
-            "desc": "Show an optimization plan",
-            "perm": "r",
-        },
-        {
-            "cmd": "balancer execute name=plan,type=CephString",
-            "desc": "Execute an optimization plan",
-            "perm": "r",
-        },
-    ]
-    active = False
-    run = True
-    plans = {}
-    mode = ''
-
-    def __init__(self, *args, **kwargs):
-        super(Module, self).__init__(*args, **kwargs)
-        self.event = Event()
-
-    def handle_command(self, command):
-        self.log.warn("Handling command: '%s'" % str(command))
-        if command['prefix'] == 'balancer status':
-            s = {
-                'plans': self.plans.keys(),
-                'active': self.active,
-                'mode': self.get_config('mode', default_mode),
-            }
-            return (0, json.dumps(s, indent=4), '')
-        elif command['prefix'] == 'balancer mode':
-            self.set_config('mode', command['mode'])
-            return (0, '', '')
-        elif command['prefix'] == 'balancer on':
-            if not self.active:
-                self.set_config('active', '1')
-                self.active = True
-            self.event.set()
-            return (0, '', '')
-        elif command['prefix'] == 'balancer off':
-            if self.active:
-                self.set_config('active', '')
-                self.active = False
-            self.event.set()
-            return (0, '', '')
-        elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
-            verbose = command['prefix'] == 'balancer eval-verbose'
-            if 'plan' in command:
-                plan = self.plans.get(command['plan'])
-                if not plan:
-                    return (-errno.ENOENT, '', 'plan %s not found' %
-                            command['plan'])
-                ms = plan.final_state()
-            else:
-                ms = MappingState(self.get_osdmap(),
-                                  self.get("pg_dump"),
-                                  'current cluster')
-            return (0, self.evaluate(ms, verbose=verbose), '')
-        elif command['prefix'] == 'balancer optimize':
-            plan = self.plan_create(command['plan'])
-            self.optimize(plan)
-            return (0, '', '')
-        elif command['prefix'] == 'balancer rm':
-            self.plan_rm(command['name'])
-            return (0, '', '')
-        elif command['prefix'] == 'balancer reset':
-            self.plans = {}
-            return (0, '', '')
-        elif command['prefix'] == 'balancer dump':
-            plan = self.plans.get(command['plan'])
-            if not plan:
-                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
-            return (0, plan.dump(), '')
-        elif command['prefix'] == 'balancer show':
-            plan = self.plans.get(command['plan'])
-            if not plan:
-                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
-            return (0, plan.show(), '')
-        elif command['prefix'] == 'balancer execute':
-            plan = self.plans.get(command['plan'])
-            if not plan:
-                return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
-            self.execute(plan)
-            self.plan_rm(plan)
-            return (0, '', '')
-        else:
-            return (-errno.EINVAL, '',
-                    "Command not found '{0}'".format(command['prefix']))
-
-    def shutdown(self):
-        self.log.info('Stopping')
-        self.run = False
-        self.event.set()
-
-    def time_in_interval(self, tod, begin, end):
-        if begin <= end:
-            return tod >= begin and tod < end
-        else:
-            return tod >= begin or tod < end
-
-    def serve(self):
-        self.log.info('Starting')
-        while self.run:
-            self.active = self.get_config('active', '') is not ''
-            begin_time = self.get_config('begin_time') or '0000'
-            end_time = self.get_config('end_time') or '2400'
-            timeofday = time.strftime('%H%M', time.localtime())
-            self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]',
-                           "active" if self.active else "inactive",
-                           begin_time, end_time, timeofday)
-            sleep_interval = float(self.get_config('sleep_interval',
-                                                   default_sleep_interval))
-            if self.active and self.time_in_interval(timeofday, begin_time, end_time):
-                self.log.debug('Running')
-                name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
-                plan = self.plan_create(name)
-                if self.optimize(plan):
-                    self.execute(plan)
-                self.plan_rm(name)
-            self.log.debug('Sleeping for %d', sleep_interval)
-            self.event.wait(sleep_interval)
-            self.event.clear()
-
-    def plan_create(self, name):
-        plan = Plan(name, MappingState(self.get_osdmap(),
-                                       self.get("pg_dump"),
-                                       'plan %s initial' % name))
-        self.plans[name] = plan
-        return plan
-
-    def plan_rm(self, name):
-        if name in self.plans:
-            del self.plans[name]
-
-    def calc_eval(self, ms):
-        pe = Eval(ms)
-        pool_rule = {}
-        pool_info = {}
-        for p in ms.osdmap_dump.get('pools',[]):
-            pe.pool_name[p['pool']] = p['pool_name']
-            pe.pool_id[p['pool_name']] = p['pool']
-            pool_rule[p['pool_name']] = p['crush_rule']
-            pe.pool_roots[p['pool_name']] = []
-            pool_info[p['pool_name']] = p
-        pools = pe.pool_id.keys()
-        if len(pools) == 0:
-            return pe
-        self.log.debug('pool_name %s' % pe.pool_name)
-        self.log.debug('pool_id %s' % pe.pool_id)
-        self.log.debug('pools %s' % pools)
-        self.log.debug('pool_rule %s' % pool_rule)
-
-        osd_weight = { a['osd']: a['weight']
-                       for a in ms.osdmap_dump.get('osds',[]) }
-
-        # get expected distributions by root
-        actual_by_root = {}
-        rootids = ms.crush.find_takes()
-        roots = []
-        for rootid in rootids:
-            root = ms.crush.get_item_name(rootid)
-            pe.root_ids[root] = rootid
-            roots.append(root)
-            ls = ms.osdmap.get_pools_by_take(rootid)
-            pe.root_pools[root] = []
-            for poolid in ls:
-                pe.pool_roots[pe.pool_name[poolid]].append(root)
-                pe.root_pools[root].append(pe.pool_name[poolid])
-            weight_map = ms.crush.get_take_weight_osd_map(rootid)
-            adjusted_map = {
-                osd: cw * osd_weight.get(osd, 1.0)
-                for osd,cw in weight_map.iteritems()
-            }
-            sum_w = sum(adjusted_map.values()) or 1.0
-            pe.target_by_root[root] = { osd: w / sum_w
-                                        for osd,w in adjusted_map.iteritems() }
-            actual_by_root[root] = {
-                'pgs': {},
-                'objects': {},
-                'bytes': {},
-            }
-            for osd in pe.target_by_root[root].iterkeys():
-                actual_by_root[root]['pgs'][osd] = 0
-                actual_by_root[root]['objects'][osd] = 0
-                actual_by_root[root]['bytes'][osd] = 0
-            pe.total_by_root[root] = {
-                'pgs': 0,
-                'objects': 0,
-                'bytes': 0,
-            }
-        self.log.debug('pool_roots %s' % pe.pool_roots)
-        self.log.debug('root_pools %s' % pe.root_pools)
-        self.log.debug('target_by_root %s' % pe.target_by_root)
-
-        # pool and root actual
-        for pool, pi in pool_info.iteritems():
-            poolid = pi['pool']
-            pm = ms.pg_up_by_poolid[poolid]
-            pgs = 0
-            objects = 0
-            bytes = 0
-            pgs_by_osd = {}
-            objects_by_osd = {}
-            bytes_by_osd = {}
-            for root in pe.pool_roots[pool]:
-                for osd in pe.target_by_root[root].iterkeys():
-                    pgs_by_osd[osd] = 0
-                    objects_by_osd[osd] = 0
-                    bytes_by_osd[osd] = 0
-            for pgid, up in pm.iteritems():
-                for osd in [int(osd) for osd in up]:
-                    pgs_by_osd[osd] += 1
-                    objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
-                    bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
-                    # pick a root to associate this pg instance with.
-                    # note that this is imprecise if the roots have
-                    # overlapping children.
-                    # FIXME: divide bytes by k for EC pools.
-                    for root in pe.pool_roots[pool]:
-                        if osd in pe.target_by_root[root]:
-                            actual_by_root[root]['pgs'][osd] += 1
-                            actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
-                            actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
-                            pgs += 1
-                            objects += ms.pg_stat[pgid]['num_objects']
-                            bytes += ms.pg_stat[pgid]['num_bytes']
-                            pe.total_by_root[root]['pgs'] += 1
-                            pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
-                            pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
-                            break
-            pe.count_by_pool[pool] = {
-                'pgs': {
-                    k: v
-                    for k, v in pgs_by_osd.iteritems()
-                },
-                'objects': {
-                    k: v
-                    for k, v in objects_by_osd.iteritems()
-                },
-                'bytes': {
-                    k: v
-                    for k, v in bytes_by_osd.iteritems()
-                },
-            }
-            pe.actual_by_pool[pool] = {
-                'pgs': {
-                    k: float(v) / float(max(pgs, 1))
-                    for k, v in pgs_by_osd.iteritems()
-                },
-                'objects': {
-                    k: float(v) / float(max(objects, 1))
-                    for k, v in objects_by_osd.iteritems()
-                },
-                'bytes': {
-                    k: float(v) / float(max(bytes, 1))
-                    for k, v in bytes_by_osd.iteritems()
-                },
-            }
-            pe.total_by_pool[pool] = {
-                'pgs': pgs,
-                'objects': objects,
-                'bytes': bytes,
-            }
-        for root, m in pe.total_by_root.iteritems():
-            pe.count_by_root[root] = {
-                'pgs': {
-                    k: float(v)
-                    for k, v in actual_by_root[root]['pgs'].iteritems()
-                },
-                'objects': {
-                    k: float(v)
-                    for k, v in actual_by_root[root]['objects'].iteritems()
-                },
-                'bytes': {
-                    k: float(v)
-                    for k, v in actual_by_root[root]['bytes'].iteritems()
-                },
-            }
-            pe.actual_by_root[root] = {
-                'pgs': {
-                    k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
-                    for k, v in actual_by_root[root]['pgs'].iteritems()
-                },
-                'objects': {
-                    k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
-                    for k, v in actual_by_root[root]['objects'].iteritems()
-                },
-                'bytes': {
-                    k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
-                    for k, v in actual_by_root[root]['bytes'].iteritems()
-                },
-            }
-        self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
-        self.log.debug('actual_by_root %s' % pe.actual_by_root)
-
-        # average and stddev and score
-        pe.stats_by_root = {
-            a: pe.calc_stats(
-                b,
-                pe.target_by_root[a],
-                pe.total_by_root[a]
-            ) for a, b in pe.count_by_root.iteritems()
-        }
-
-       # the scores are already normalized
-        pe.score_by_root = {
-            r: {
-                'pgs': pe.stats_by_root[r]['pgs']['score'],
-                'objects': pe.stats_by_root[r]['objects']['score'],
-                'bytes': pe.stats_by_root[r]['bytes']['score'],
-            } for r in pe.total_by_root.keys()
-        }
-
-        # total score is just average of normalized stddevs
-        pe.score = 0.0
-        for r, vs in pe.score_by_root.iteritems():
-            for k, v in vs.iteritems():
-                pe.score += v
-        pe.score /= 3 * len(roots)
-        return pe
-
-    def evaluate(self, ms, verbose=False):
-        pe = self.calc_eval(ms)
-        return pe.show(verbose=verbose)
-
-    def optimize(self, plan):
-        self.log.info('Optimize plan %s' % plan.name)
-        plan.mode = self.get_config('mode', default_mode)
-        max_misplaced = float(self.get_config('max_misplaced',
-                                              default_max_misplaced))
-        self.log.info('Mode %s, max misplaced %f' %
-                      (plan.mode, max_misplaced))
-
-        info = self.get('pg_status')
-        unknown = info.get('unknown_pgs_ratio', 0.0)
-        degraded = info.get('degraded_ratio', 0.0)
-        inactive = info.get('inactive_pgs_ratio', 0.0)
-        misplaced = info.get('misplaced_ratio', 0.0)
-        self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
-                       unknown, degraded, inactive, misplaced)
-        if unknown > 0.0:
-            self.log.info('Some PGs (%f) are unknown; waiting', unknown)
-        elif degraded > 0.0:
-            self.log.info('Some objects (%f) are degraded; waiting', degraded)
-        elif inactive > 0.0:
-            self.log.info('Some PGs (%f) are inactive; waiting', inactive)
-        elif misplaced >= max_misplaced:
-            self.log.info('Too many objects (%f > %f) are misplaced; waiting',
-                          misplaced, max_misplaced)
-        else:
-            if plan.mode == 'upmap':
-                return self.do_upmap(plan)
-            elif plan.mode == 'crush-compat':
-                return self.do_crush_compat(plan)
-            elif plan.mode == 'none':
-                self.log.info('Idle')
-            else:
-                self.log.info('Unrecognized mode %s' % plan.mode)
-        return False
-
-        ##
-
-    def do_upmap(self, plan):
-        self.log.info('do_upmap')
-        max_iterations = self.get_config('upmap_max_iterations', 10)
-        max_deviation = self.get_config('upmap_max_deviation', .01)
-
-        ms = plan.initial
-        pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
-        if len(pools) == 0:
-            self.log.info('no pools, nothing to do')
-            return False
-        # shuffle pool list so they all get equal (in)attention
-        random.shuffle(pools)
-        self.log.info('pools %s' % pools)
-
-        inc = plan.inc
-        total_did = 0
-        left = max_iterations
-        for pool in pools:
-            did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
-            total_did += did
-            left -= did
-            if left <= 0:
-                break
-        self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
-        return True
-
-    def do_crush_compat(self, plan):
-        self.log.info('do_crush_compat')
-        max_iterations = self.get_config('crush_compat_max_iterations', 25)
-        if max_iterations < 1:
-            return False
-        step = self.get_config('crush_compat_step', .5)
-        if step <= 0 or step >= 1.0:
-            return False
-        max_misplaced = float(self.get_config('max_misplaced',
-                                              default_max_misplaced))
-        min_pg_per_osd = 2
-
-        ms = plan.initial
-        osdmap = ms.osdmap
-        crush = osdmap.get_crush()
-        pe = self.calc_eval(ms)
-        if pe.score == 0:
-            self.log.info('Distribution is already perfect')
-            return False
-
-        # get current osd reweights
-        orig_osd_weight = { a['osd']: a['weight']
-                            for a in ms.osdmap_dump.get('osds',[]) }
-        reweighted_osds = [ a for a,b in orig_osd_weight.iteritems()
-                            if b < 1.0 and b > 0.0 ]
-
-        # get current compat weight-set weights
-        orig_ws = self.get_compat_weight_set_weights()
-        orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 }
-
-        # Make sure roots don't overlap their devices.  If so, we
-        # can't proceed.
-        roots = pe.target_by_root.keys()
-        self.log.debug('roots %s', roots)
-        visited = {}
-        overlap = {}
-        root_ids = {}
-        for root, wm in pe.target_by_root.iteritems():
-            for osd in wm.iterkeys():
-                if osd in visited:
-                    overlap[osd] = 1
-                visited[osd] = 1
-        if len(overlap) > 0:
-            self.log.err('error: some osds belong to multiple subtrees: %s' %
-                         overlap)
-            return False
-
-        key = 'pgs'  # pgs objects or bytes
-
-        # go
-        best_ws = copy.deepcopy(orig_ws)
-        best_ow = copy.deepcopy(orig_osd_weight)
-        best_pe = pe
-        left = max_iterations
-        bad_steps = 0
-        next_ws = copy.deepcopy(best_ws)
-        next_ow = copy.deepcopy(best_ow)
-        while left > 0:
-            # adjust
-            self.log.debug('best_ws %s' % best_ws)
-            random.shuffle(roots)
-            for root in roots:
-                pools = best_pe.root_pools[root]
-                pgs = len(best_pe.target_by_root[root])
-                min_pgs = pgs * min_pg_per_osd
-                if best_pe.total_by_root[root] < min_pgs:
-                    self.log.info('Skipping root %s (pools %s), total pgs %d '
-                                  '< minimum %d (%d per osd)',
-                                  root, pools, pgs, min_pgs, min_pg_per_osd)
-                    continue
-                self.log.info('Balancing root %s (pools %s) by %s' %
-                              (root, pools, key))
-                target = best_pe.target_by_root[root]
-                actual = best_pe.actual_by_root[root][key]
-                queue = sorted(actual.keys(),
-                               key=lambda osd: -abs(target[osd] - actual[osd]))
-                for osd in queue:
-                    if orig_osd_weight[osd] == 0:
-                        self.log.debug('skipping out osd.%d', osd)
-                    else:
-                        deviation = target[osd] - actual[osd]
-                        if deviation == 0:
-                            break
-                        self.log.debug('osd.%d deviation %f', osd, deviation)
-                        weight = best_ws[osd]
-                        ow = orig_osd_weight[osd]
-                        if actual[osd] > 0:
-                            calc_weight = target[osd] / actual[osd] * weight * ow
-                        else:
-                            # not enough to go on here... keep orig weight
-                            calc_weight = weight / orig_osd_weight[osd]
-                        new_weight = weight * (1.0 - step) + calc_weight * step
-                        self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
-                                       new_weight)
-                        next_ws[osd] = new_weight
-                        if ow < 1.0:
-                            new_ow = min(1.0, max(step + (1.0 - step) * ow,
-                                                  ow + .005))
-                            self.log.debug('Reweight osd.%d reweight %f -> %f',
-                                           osd, ow, new_ow)
-                            next_ow[osd] = new_ow
-
-                # normalize weights under this root
-                root_weight = crush.get_item_weight(pe.root_ids[root])
-                root_sum = sum(b for a,b in next_ws.iteritems()
-                               if a in target.keys())
-                if root_sum > 0 and root_weight > 0:
-                    factor = root_sum / root_weight
-                    self.log.debug('normalizing root %s %d, weight %f, '
-                                   'ws sum %f, factor %f',
-                                   root, pe.root_ids[root], root_weight,
-                                   root_sum, factor)
-                    for osd in actual.keys():
-                        next_ws[osd] = next_ws[osd] / factor
-
-            # recalc
-            plan.compat_ws = copy.deepcopy(next_ws)
-            next_ms = plan.final_state()
-            next_pe = self.calc_eval(next_ms)
-            next_misplaced = next_ms.calc_misplaced_from(ms)
-            self.log.debug('Step result score %f -> %f, misplacing %f',
-                           best_pe.score, next_pe.score, next_misplaced)
-
-            if next_misplaced > max_misplaced:
-                if best_pe.score < pe.score:
-                    self.log.debug('Step misplaced %f > max %f, stopping',
-                                   next_misplaced, max_misplaced)
-                    break
-                step /= 2.0
-                next_ws = copy.deepcopy(best_ws)
-                next_ow = copy.deepcopy(best_ow)
-                self.log.debug('Step misplaced %f > max %f, reducing step to %f',
-                               next_misplaced, max_misplaced, step)
-            else:
-                if next_pe.score > best_pe.score * 1.0001:
-                    if bad_steps < 5 and random.randint(0, 100) < 70:
-                        self.log.debug('Score got worse, taking another step')
-                    else:
-                        step /= 2.0
-                        next_ws = copy.deepcopy(best_ws)
-                        next_ow = copy.deepcopy(best_ow)
-                        self.log.debug('Score got worse, trying smaller step %f',
-                                       step)
-                else:
-                    bad_steps = 0
-                    best_pe = next_pe
-                    best_ws = next_ws
-                    best_ow = next_ow
-                    if best_pe.score == 0:
-                        break
-            left -= 1
-
-        # allow a small regression if we are phasing out osd weights
-        fudge = 0
-        if next_ow != orig_osd_weight:
-            fudge = .001
-
-        if best_pe.score < pe.score + fudge:
-            self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
-            plan.compat_ws = best_ws
-            for osd, w in best_ow.iteritems():
-                if w != orig_osd_weight[osd]:
-                    self.log.debug('osd.%d reweight %f', osd, w)
-                    plan.osd_weights[osd] = w
-            return True
-        else:
-            self.log.info('Failed to find further optimization, score %f',
-                          pe.score)
-            return False
-
-    def get_compat_weight_set_weights(self):
-        # enable compat weight-set
-        self.log.debug('ceph osd crush weight-set create-compat')
-        result = CommandResult('')
-        self.send_command(result, 'mon', '', json.dumps({
-            'prefix': 'osd crush weight-set create-compat',
-            'format': 'json',
-        }), '')
-        r, outb, outs = result.wait()
-        if r != 0:
-            self.log.error('Error creating compat weight-set')
-            return
-
-        result = CommandResult('')
-        self.send_command(result, 'mon', '', json.dumps({
-            'prefix': 'osd crush dump',
-            'format': 'json',
-        }), '')
-        r, outb, outs = result.wait()
-        if r != 0:
-            self.log.error('Error dumping crush map')
-            return
-        try:
-            crushmap = json.loads(outb)
-        except:
-            raise RuntimeError('unable to parse crush map')
-
-        raw = crushmap.get('choose_args',{}).get('-1', [])
-        weight_set = {}
-        for b in raw:
-            bucket = None
-            for t in crushmap['buckets']:
-                if t['id'] == b['bucket_id']:
-                    bucket = t
-                    break
-            if not bucket:
-                raise RuntimeError('could not find bucket %s' % b['bucket_id'])
-            self.log.debug('bucket items %s' % bucket['items'])
-            self.log.debug('weight set %s' % b['weight_set'][0])
-            if len(bucket['items']) != len(b['weight_set'][0]):
-                raise RuntimeError('weight-set size does not match bucket items')
-            for pos in range(len(bucket['items'])):
-                weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
-
-        self.log.debug('weight_set weights %s' % weight_set)
-        return weight_set
-
-    def do_crush(self):
-        self.log.info('do_crush (not yet implemented)')
-
-    def do_osd_weight(self):
-        self.log.info('do_osd_weight (not yet implemented)')
-
-    def execute(self, plan):
-        self.log.info('Executing plan %s' % plan.name)
-
-        commands = []
-
-        # compat weight-set
-        if len(plan.compat_ws) and \
-           '-1' not in plan.initial.crush_dump.get('choose_args', {}):
-            self.log.debug('ceph osd crush weight-set create-compat')
-            result = CommandResult('')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd crush weight-set create-compat',
-                'format': 'json',
-            }), '')
-            r, outb, outs = result.wait()
-            if r != 0:
-                self.log.error('Error creating compat weight-set')
-                return
-
-        for osd, weight in plan.compat_ws.iteritems():
-            self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
-                          osd, weight)
-            result = CommandResult('foo')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd crush weight-set reweight-compat',
-                'format': 'json',
-                'item': 'osd.%d' % osd,
-                'weight': [weight],
-            }), 'foo')
-            commands.append(result)
-
-        # new_weight
-        reweightn = {}
-        for osd, weight in plan.osd_weights.iteritems():
-            reweightn[str(osd)] = str(int(weight * float(0x10000)))
-        if len(reweightn):
-            self.log.info('ceph osd reweightn %s', reweightn)
-            result = CommandResult('foo')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd reweightn',
-                'format': 'json',
-                'weights': json.dumps(reweightn),
-            }), 'foo')
-            commands.append(result)
-
-        # upmap
-        incdump = plan.inc.dump()
-        for pgid in incdump.get('old_pg_upmap_items', []):
-            self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
-            result = CommandResult('foo')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd rm-pg-upmap-items',
-                'format': 'json',
-                'pgid': pgid,
-            }), 'foo')
-            commands.append(result)
-
-        for item in incdump.get('new_pg_upmap_items', []):
-            self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
-                          item['mappings'])
-            osdlist = []
-            for m in item['mappings']:
-                osdlist += [m['from'], m['to']]
-            result = CommandResult('foo')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd pg-upmap-items',
-                'format': 'json',
-                'pgid': item['pgid'],
-                'id': osdlist,
-            }), 'foo')
-            commands.append(result)
-
-        # wait for commands
-        self.log.debug('commands %s' % commands)
-        for result in commands:
-            r, outb, outs = result.wait()
-            if r != 0:
-                self.log.error('Error on command')
-                return
-        self.log.debug('done')