X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Fbalancer%2Fmodule.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Fbalancer%2Fmodule.py;h=b8cc087b9e8b8edaf446e999e82902e934165370;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/pybind/mgr/balancer/module.py b/src/ceph/src/pybind/mgr/balancer/module.py new file mode 100644 index 0000000..b8cc087 --- /dev/null +++ b/src/ceph/src/pybind/mgr/balancer/module.py @@ -0,0 +1,933 @@ + +""" +Balance PG distribution across OSDs. +""" + +import copy +import errno +import json +import math +import random +import time +from mgr_module import MgrModule, CommandResult +from threading import Event + +# available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight' +default_mode = 'none' +default_sleep_interval = 60 # seconds +default_max_misplaced = .05 # max ratio of pgs replaced at a time + +TIME_FORMAT = '%Y-%m-%d_%H:%M:%S' + + +class MappingState: + def __init__(self, osdmap, pg_dump, desc=''): + self.desc = desc + self.osdmap = osdmap + self.osdmap_dump = self.osdmap.dump() + self.crush = osdmap.get_crush() + self.crush_dump = self.crush.dump() + self.pg_dump = pg_dump + self.pg_stat = { + i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', []) + } + self.poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])] + self.pg_up = {} + self.pg_up_by_poolid = {} + for poolid in self.poolids: + self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid) + for a,b in self.pg_up_by_poolid[poolid].iteritems(): + self.pg_up[a] = b + + def calc_misplaced_from(self, other_ms): + num = len(other_ms.pg_up) + misplaced = 0 + for pgid, before in other_ms.pg_up.iteritems(): + if before != self.pg_up.get(pgid, []): + misplaced += 1 + if num > 0: + return float(misplaced) / float(num) + return 0.0 + +class Plan: + def __init__(self, name, ms): + self.mode = 'unknown' + self.name = name + self.initial = ms + + self.osd_weights = {} + self.compat_ws = {} + self.inc = ms.osdmap.new_incremental() + + def final_state(self): + self.inc.set_osd_reweights(self.osd_weights) + self.inc.set_crush_compat_weight_set_weights(self.compat_ws) + return MappingState(self.initial.osdmap.apply_incremental(self.inc), + self.initial.pg_dump, + 'plan %s final' % self.name) + + def dump(self): + return json.dumps(self.inc.dump(), indent=4) + + def show(self): + ls = [] + ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch()) + ls.append('# starting crush version %d' % + self.initial.osdmap.get_crush_version()) + ls.append('# mode %s' % self.mode) + if len(self.compat_ws) and \ + '-1' not in self.initial.crush_dump.get('choose_args', {}): + ls.append('ceph osd crush weight-set create-compat') + for osd, weight in self.compat_ws.iteritems(): + ls.append('ceph osd crush weight-set reweight-compat %s %f' % + (osd, weight)) + for osd, weight in self.osd_weights.iteritems(): + ls.append('ceph osd reweight osd.%d %f' % (osd, weight)) + incdump = self.inc.dump() + for pgid in incdump.get('old_pg_upmap_items', []): + ls.append('ceph osd rm-pg-upmap-items %s' % pgid) + for item in incdump.get('new_pg_upmap_items', []): + osdlist = [] + for m in item['mappings']: + osdlist += [m['from'], m['to']] + ls.append('ceph osd pg-upmap-items %s %s' % + (item['pgid'], ' '.join([str(a) for a in osdlist]))) + return '\n'.join(ls) + + +class Eval: + root_ids = {} # root name -> id + pool_name = {} # pool id -> pool name + pool_id = {} # pool name -> id + pool_roots = {} # pool name -> root name + root_pools = {} # root name -> pools + target_by_root = {} # root name -> target weight map + count_by_pool = {} + count_by_root = {} + actual_by_pool = {} # pool -> by_* -> actual weight map + actual_by_root = {} # pool -> by_* -> actual weight map + total_by_pool = {} # pool -> by_* -> total + total_by_root = {} # root -> by_* -> total + stats_by_pool = {} # pool -> by_* -> stddev or avg -> value + stats_by_root = {} # root -> by_* -> stddev or avg -> value + + score_by_pool = {} + score_by_root = {} + + score = 0.0 + + def __init__(self, ms): + self.ms = ms + + def show(self, verbose=False): + if verbose: + r = self.ms.desc + '\n' + r += 'target_by_root %s\n' % self.target_by_root + r += 'actual_by_pool %s\n' % self.actual_by_pool + r += 'actual_by_root %s\n' % self.actual_by_root + r += 'count_by_pool %s\n' % self.count_by_pool + r += 'count_by_root %s\n' % self.count_by_root + r += 'total_by_pool %s\n' % self.total_by_pool + r += 'total_by_root %s\n' % self.total_by_root + r += 'stats_by_root %s\n' % self.stats_by_root + r += 'score_by_pool %s\n' % self.score_by_pool + r += 'score_by_root %s\n' % self.score_by_root + else: + r = self.ms.desc + ' ' + r += 'score %f (lower is better)\n' % self.score + return r + + def calc_stats(self, count, target, total): + num = max(len(target), 1) + r = {} + for t in ('pgs', 'objects', 'bytes'): + avg = float(total[t]) / float(num) + dev = 0.0 + + # score is a measure of how uneven the data distribution is. + # score lies between [0, 1), 0 means perfect distribution. + score = 0.0 + sum_weight = 0.0 + + for k, v in count[t].iteritems(): + # adjust/normalize by weight + if target[k]: + adjusted = float(v) / target[k] / float(num) + else: + adjusted = 0.0 + + # Overweighted devices and their weights are factors to calculate reweight_urgency. + # One 10% underfilled device with 5 2% overfilled devices, is arguably a better + # situation than one 10% overfilled with 5 2% underfilled devices + if adjusted > avg: + ''' + F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution + x = (adjusted - avg)/avg. + Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1). + To bring range of F(x) in range [0, 1), we need to make the above modification. + + In general, we need to use a function F(x), where x = (adjusted - avg)/avg + 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded. + 2. A larger value of x, should imply more urgency to reweight. + 3. Also, the difference between F(x) when x is large, should be minimal. + 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply. + + Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use. + + cdf of standard normal distribution: https://stackoverflow.com/a/29273201 + ''' + score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0))) + sum_weight += target[k] + dev += (avg - adjusted) * (avg - adjusted) + stddev = math.sqrt(dev / float(max(num - 1, 1))) + score = score / max(sum_weight, 1) + r[t] = { + 'avg': avg, + 'stddev': stddev, + 'sum_weight': sum_weight, + 'score': score, + } + return r + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "balancer status", + "desc": "Show balancer status", + "perm": "r", + }, + { + "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap", + "desc": "Set balancer mode", + "perm": "rw", + }, + { + "cmd": "balancer on", + "desc": "Enable automatic balancing", + "perm": "rw", + }, + { + "cmd": "balancer off", + "desc": "Disable automatic balancing", + "perm": "rw", + }, + { + "cmd": "balancer eval name=plan,type=CephString,req=false", + "desc": "Evaluate data distribution for the current cluster or specific plan", + "perm": "r", + }, + { + "cmd": "balancer eval-verbose name=plan,type=CephString,req=false", + "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)", + "perm": "r", + }, + { + "cmd": "balancer optimize name=plan,type=CephString", + "desc": "Run optimizer to create a new plan", + "perm": "rw", + }, + { + "cmd": "balancer show name=plan,type=CephString", + "desc": "Show details of an optimization plan", + "perm": "r", + }, + { + "cmd": "balancer rm name=plan,type=CephString", + "desc": "Discard an optimization plan", + "perm": "rw", + }, + { + "cmd": "balancer reset", + "desc": "Discard all optimization plans", + "perm": "rw", + }, + { + "cmd": "balancer dump name=plan,type=CephString", + "desc": "Show an optimization plan", + "perm": "r", + }, + { + "cmd": "balancer execute name=plan,type=CephString", + "desc": "Execute an optimization plan", + "perm": "r", + }, + ] + active = False + run = True + plans = {} + mode = '' + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + + def handle_command(self, command): + self.log.warn("Handling command: '%s'" % str(command)) + if command['prefix'] == 'balancer status': + s = { + 'plans': self.plans.keys(), + 'active': self.active, + 'mode': self.get_config('mode', default_mode), + } + return (0, json.dumps(s, indent=4), '') + elif command['prefix'] == 'balancer mode': + self.set_config('mode', command['mode']) + return (0, '', '') + elif command['prefix'] == 'balancer on': + if not self.active: + self.set_config('active', '1') + self.active = True + self.event.set() + return (0, '', '') + elif command['prefix'] == 'balancer off': + if self.active: + self.set_config('active', '') + self.active = False + self.event.set() + return (0, '', '') + elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose': + verbose = command['prefix'] == 'balancer eval-verbose' + if 'plan' in command: + plan = self.plans.get(command['plan']) + if not plan: + return (-errno.ENOENT, '', 'plan %s not found' % + command['plan']) + ms = plan.final_state() + else: + ms = MappingState(self.get_osdmap(), + self.get("pg_dump"), + 'current cluster') + return (0, self.evaluate(ms, verbose=verbose), '') + elif command['prefix'] == 'balancer optimize': + plan = self.plan_create(command['plan']) + self.optimize(plan) + return (0, '', '') + elif command['prefix'] == 'balancer rm': + self.plan_rm(command['name']) + return (0, '', '') + elif command['prefix'] == 'balancer reset': + self.plans = {} + return (0, '', '') + elif command['prefix'] == 'balancer dump': + plan = self.plans.get(command['plan']) + if not plan: + return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) + return (0, plan.dump(), '') + elif command['prefix'] == 'balancer show': + plan = self.plans.get(command['plan']) + if not plan: + return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) + return (0, plan.show(), '') + elif command['prefix'] == 'balancer execute': + plan = self.plans.get(command['plan']) + if not plan: + return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) + self.execute(plan) + self.plan_rm(plan) + return (0, '', '') + else: + return (-errno.EINVAL, '', + "Command not found '{0}'".format(command['prefix'])) + + def shutdown(self): + self.log.info('Stopping') + self.run = False + self.event.set() + + def time_in_interval(self, tod, begin, end): + if begin <= end: + return tod >= begin and tod < end + else: + return tod >= begin or tod < end + + def serve(self): + self.log.info('Starting') + while self.run: + self.active = self.get_config('active', '') is not '' + begin_time = self.get_config('begin_time') or '0000' + end_time = self.get_config('end_time') or '2400' + timeofday = time.strftime('%H%M', time.localtime()) + self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]', + "active" if self.active else "inactive", + begin_time, end_time, timeofday) + sleep_interval = float(self.get_config('sleep_interval', + default_sleep_interval)) + if self.active and self.time_in_interval(timeofday, begin_time, end_time): + self.log.debug('Running') + name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime()) + plan = self.plan_create(name) + if self.optimize(plan): + self.execute(plan) + self.plan_rm(name) + self.log.debug('Sleeping for %d', sleep_interval) + self.event.wait(sleep_interval) + self.event.clear() + + def plan_create(self, name): + plan = Plan(name, MappingState(self.get_osdmap(), + self.get("pg_dump"), + 'plan %s initial' % name)) + self.plans[name] = plan + return plan + + def plan_rm(self, name): + if name in self.plans: + del self.plans[name] + + def calc_eval(self, ms): + pe = Eval(ms) + pool_rule = {} + pool_info = {} + for p in ms.osdmap_dump.get('pools',[]): + pe.pool_name[p['pool']] = p['pool_name'] + pe.pool_id[p['pool_name']] = p['pool'] + pool_rule[p['pool_name']] = p['crush_rule'] + pe.pool_roots[p['pool_name']] = [] + pool_info[p['pool_name']] = p + pools = pe.pool_id.keys() + if len(pools) == 0: + return pe + self.log.debug('pool_name %s' % pe.pool_name) + self.log.debug('pool_id %s' % pe.pool_id) + self.log.debug('pools %s' % pools) + self.log.debug('pool_rule %s' % pool_rule) + + osd_weight = { a['osd']: a['weight'] + for a in ms.osdmap_dump.get('osds',[]) } + + # get expected distributions by root + actual_by_root = {} + rootids = ms.crush.find_takes() + roots = [] + for rootid in rootids: + root = ms.crush.get_item_name(rootid) + pe.root_ids[root] = rootid + roots.append(root) + ls = ms.osdmap.get_pools_by_take(rootid) + pe.root_pools[root] = [] + for poolid in ls: + pe.pool_roots[pe.pool_name[poolid]].append(root) + pe.root_pools[root].append(pe.pool_name[poolid]) + weight_map = ms.crush.get_take_weight_osd_map(rootid) + adjusted_map = { + osd: cw * osd_weight.get(osd, 1.0) + for osd,cw in weight_map.iteritems() + } + sum_w = sum(adjusted_map.values()) or 1.0 + pe.target_by_root[root] = { osd: w / sum_w + for osd,w in adjusted_map.iteritems() } + actual_by_root[root] = { + 'pgs': {}, + 'objects': {}, + 'bytes': {}, + } + for osd in pe.target_by_root[root].iterkeys(): + actual_by_root[root]['pgs'][osd] = 0 + actual_by_root[root]['objects'][osd] = 0 + actual_by_root[root]['bytes'][osd] = 0 + pe.total_by_root[root] = { + 'pgs': 0, + 'objects': 0, + 'bytes': 0, + } + self.log.debug('pool_roots %s' % pe.pool_roots) + self.log.debug('root_pools %s' % pe.root_pools) + self.log.debug('target_by_root %s' % pe.target_by_root) + + # pool and root actual + for pool, pi in pool_info.iteritems(): + poolid = pi['pool'] + pm = ms.pg_up_by_poolid[poolid] + pgs = 0 + objects = 0 + bytes = 0 + pgs_by_osd = {} + objects_by_osd = {} + bytes_by_osd = {} + for root in pe.pool_roots[pool]: + for osd in pe.target_by_root[root].iterkeys(): + pgs_by_osd[osd] = 0 + objects_by_osd[osd] = 0 + bytes_by_osd[osd] = 0 + for pgid, up in pm.iteritems(): + for osd in [int(osd) for osd in up]: + pgs_by_osd[osd] += 1 + objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects'] + bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes'] + # pick a root to associate this pg instance with. + # note that this is imprecise if the roots have + # overlapping children. + # FIXME: divide bytes by k for EC pools. + for root in pe.pool_roots[pool]: + if osd in pe.target_by_root[root]: + actual_by_root[root]['pgs'][osd] += 1 + actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects'] + actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes'] + pgs += 1 + objects += ms.pg_stat[pgid]['num_objects'] + bytes += ms.pg_stat[pgid]['num_bytes'] + pe.total_by_root[root]['pgs'] += 1 + pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects'] + pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes'] + break + pe.count_by_pool[pool] = { + 'pgs': { + k: v + for k, v in pgs_by_osd.iteritems() + }, + 'objects': { + k: v + for k, v in objects_by_osd.iteritems() + }, + 'bytes': { + k: v + for k, v in bytes_by_osd.iteritems() + }, + } + pe.actual_by_pool[pool] = { + 'pgs': { + k: float(v) / float(max(pgs, 1)) + for k, v in pgs_by_osd.iteritems() + }, + 'objects': { + k: float(v) / float(max(objects, 1)) + for k, v in objects_by_osd.iteritems() + }, + 'bytes': { + k: float(v) / float(max(bytes, 1)) + for k, v in bytes_by_osd.iteritems() + }, + } + pe.total_by_pool[pool] = { + 'pgs': pgs, + 'objects': objects, + 'bytes': bytes, + } + for root, m in pe.total_by_root.iteritems(): + pe.count_by_root[root] = { + 'pgs': { + k: float(v) + for k, v in actual_by_root[root]['pgs'].iteritems() + }, + 'objects': { + k: float(v) + for k, v in actual_by_root[root]['objects'].iteritems() + }, + 'bytes': { + k: float(v) + for k, v in actual_by_root[root]['bytes'].iteritems() + }, + } + pe.actual_by_root[root] = { + 'pgs': { + k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1)) + for k, v in actual_by_root[root]['pgs'].iteritems() + }, + 'objects': { + k: float(v) / float(max(pe.total_by_root[root]['objects'], 1)) + for k, v in actual_by_root[root]['objects'].iteritems() + }, + 'bytes': { + k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1)) + for k, v in actual_by_root[root]['bytes'].iteritems() + }, + } + self.log.debug('actual_by_pool %s' % pe.actual_by_pool) + self.log.debug('actual_by_root %s' % pe.actual_by_root) + + # average and stddev and score + pe.stats_by_root = { + a: pe.calc_stats( + b, + pe.target_by_root[a], + pe.total_by_root[a] + ) for a, b in pe.count_by_root.iteritems() + } + + # the scores are already normalized + pe.score_by_root = { + r: { + 'pgs': pe.stats_by_root[r]['pgs']['score'], + 'objects': pe.stats_by_root[r]['objects']['score'], + 'bytes': pe.stats_by_root[r]['bytes']['score'], + } for r in pe.total_by_root.keys() + } + + # total score is just average of normalized stddevs + pe.score = 0.0 + for r, vs in pe.score_by_root.iteritems(): + for k, v in vs.iteritems(): + pe.score += v + pe.score /= 3 * len(roots) + return pe + + def evaluate(self, ms, verbose=False): + pe = self.calc_eval(ms) + return pe.show(verbose=verbose) + + def optimize(self, plan): + self.log.info('Optimize plan %s' % plan.name) + plan.mode = self.get_config('mode', default_mode) + max_misplaced = float(self.get_config('max_misplaced', + default_max_misplaced)) + self.log.info('Mode %s, max misplaced %f' % + (plan.mode, max_misplaced)) + + info = self.get('pg_status') + unknown = info.get('unknown_pgs_ratio', 0.0) + degraded = info.get('degraded_ratio', 0.0) + inactive = info.get('inactive_pgs_ratio', 0.0) + misplaced = info.get('misplaced_ratio', 0.0) + self.log.debug('unknown %f degraded %f inactive %f misplaced %g', + unknown, degraded, inactive, misplaced) + if unknown > 0.0: + self.log.info('Some PGs (%f) are unknown; waiting', unknown) + elif degraded > 0.0: + self.log.info('Some objects (%f) are degraded; waiting', degraded) + elif inactive > 0.0: + self.log.info('Some PGs (%f) are inactive; waiting', inactive) + elif misplaced >= max_misplaced: + self.log.info('Too many objects (%f > %f) are misplaced; waiting', + misplaced, max_misplaced) + else: + if plan.mode == 'upmap': + return self.do_upmap(plan) + elif plan.mode == 'crush-compat': + return self.do_crush_compat(plan) + elif plan.mode == 'none': + self.log.info('Idle') + else: + self.log.info('Unrecognized mode %s' % plan.mode) + return False + + ## + + def do_upmap(self, plan): + self.log.info('do_upmap') + max_iterations = self.get_config('upmap_max_iterations', 10) + max_deviation = self.get_config('upmap_max_deviation', .01) + + ms = plan.initial + pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])] + if len(pools) == 0: + self.log.info('no pools, nothing to do') + return False + # shuffle pool list so they all get equal (in)attention + random.shuffle(pools) + self.log.info('pools %s' % pools) + + inc = plan.inc + total_did = 0 + left = max_iterations + for pool in pools: + did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool]) + total_did += did + left -= did + if left <= 0: + break + self.log.info('prepared %d/%d changes' % (total_did, max_iterations)) + return True + + def do_crush_compat(self, plan): + self.log.info('do_crush_compat') + max_iterations = self.get_config('crush_compat_max_iterations', 25) + if max_iterations < 1: + return False + step = self.get_config('crush_compat_step', .5) + if step <= 0 or step >= 1.0: + return False + max_misplaced = float(self.get_config('max_misplaced', + default_max_misplaced)) + min_pg_per_osd = 2 + + ms = plan.initial + osdmap = ms.osdmap + crush = osdmap.get_crush() + pe = self.calc_eval(ms) + if pe.score == 0: + self.log.info('Distribution is already perfect') + return False + + # get current osd reweights + orig_osd_weight = { a['osd']: a['weight'] + for a in ms.osdmap_dump.get('osds',[]) } + reweighted_osds = [ a for a,b in orig_osd_weight.iteritems() + if b < 1.0 and b > 0.0 ] + + # get current compat weight-set weights + orig_ws = self.get_compat_weight_set_weights() + orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 } + + # Make sure roots don't overlap their devices. If so, we + # can't proceed. + roots = pe.target_by_root.keys() + self.log.debug('roots %s', roots) + visited = {} + overlap = {} + root_ids = {} + for root, wm in pe.target_by_root.iteritems(): + for osd in wm.iterkeys(): + if osd in visited: + overlap[osd] = 1 + visited[osd] = 1 + if len(overlap) > 0: + self.log.err('error: some osds belong to multiple subtrees: %s' % + overlap) + return False + + key = 'pgs' # pgs objects or bytes + + # go + best_ws = copy.deepcopy(orig_ws) + best_ow = copy.deepcopy(orig_osd_weight) + best_pe = pe + left = max_iterations + bad_steps = 0 + next_ws = copy.deepcopy(best_ws) + next_ow = copy.deepcopy(best_ow) + while left > 0: + # adjust + self.log.debug('best_ws %s' % best_ws) + random.shuffle(roots) + for root in roots: + pools = best_pe.root_pools[root] + pgs = len(best_pe.target_by_root[root]) + min_pgs = pgs * min_pg_per_osd + if best_pe.total_by_root[root] < min_pgs: + self.log.info('Skipping root %s (pools %s), total pgs %d ' + '< minimum %d (%d per osd)', + root, pools, pgs, min_pgs, min_pg_per_osd) + continue + self.log.info('Balancing root %s (pools %s) by %s' % + (root, pools, key)) + target = best_pe.target_by_root[root] + actual = best_pe.actual_by_root[root][key] + queue = sorted(actual.keys(), + key=lambda osd: -abs(target[osd] - actual[osd])) + for osd in queue: + if orig_osd_weight[osd] == 0: + self.log.debug('skipping out osd.%d', osd) + else: + deviation = target[osd] - actual[osd] + if deviation == 0: + break + self.log.debug('osd.%d deviation %f', osd, deviation) + weight = best_ws[osd] + ow = orig_osd_weight[osd] + if actual[osd] > 0: + calc_weight = target[osd] / actual[osd] * weight * ow + else: + # not enough to go on here... keep orig weight + calc_weight = weight / orig_osd_weight[osd] + new_weight = weight * (1.0 - step) + calc_weight * step + self.log.debug('Reweight osd.%d %f -> %f', osd, weight, + new_weight) + next_ws[osd] = new_weight + if ow < 1.0: + new_ow = min(1.0, max(step + (1.0 - step) * ow, + ow + .005)) + self.log.debug('Reweight osd.%d reweight %f -> %f', + osd, ow, new_ow) + next_ow[osd] = new_ow + + # normalize weights under this root + root_weight = crush.get_item_weight(pe.root_ids[root]) + root_sum = sum(b for a,b in next_ws.iteritems() + if a in target.keys()) + if root_sum > 0 and root_weight > 0: + factor = root_sum / root_weight + self.log.debug('normalizing root %s %d, weight %f, ' + 'ws sum %f, factor %f', + root, pe.root_ids[root], root_weight, + root_sum, factor) + for osd in actual.keys(): + next_ws[osd] = next_ws[osd] / factor + + # recalc + plan.compat_ws = copy.deepcopy(next_ws) + next_ms = plan.final_state() + next_pe = self.calc_eval(next_ms) + next_misplaced = next_ms.calc_misplaced_from(ms) + self.log.debug('Step result score %f -> %f, misplacing %f', + best_pe.score, next_pe.score, next_misplaced) + + if next_misplaced > max_misplaced: + if best_pe.score < pe.score: + self.log.debug('Step misplaced %f > max %f, stopping', + next_misplaced, max_misplaced) + break + step /= 2.0 + next_ws = copy.deepcopy(best_ws) + next_ow = copy.deepcopy(best_ow) + self.log.debug('Step misplaced %f > max %f, reducing step to %f', + next_misplaced, max_misplaced, step) + else: + if next_pe.score > best_pe.score * 1.0001: + if bad_steps < 5 and random.randint(0, 100) < 70: + self.log.debug('Score got worse, taking another step') + else: + step /= 2.0 + next_ws = copy.deepcopy(best_ws) + next_ow = copy.deepcopy(best_ow) + self.log.debug('Score got worse, trying smaller step %f', + step) + else: + bad_steps = 0 + best_pe = next_pe + best_ws = next_ws + best_ow = next_ow + if best_pe.score == 0: + break + left -= 1 + + # allow a small regression if we are phasing out osd weights + fudge = 0 + if next_ow != orig_osd_weight: + fudge = .001 + + if best_pe.score < pe.score + fudge: + self.log.info('Success, score %f -> %f', pe.score, best_pe.score) + plan.compat_ws = best_ws + for osd, w in best_ow.iteritems(): + if w != orig_osd_weight[osd]: + self.log.debug('osd.%d reweight %f', osd, w) + plan.osd_weights[osd] = w + return True + else: + self.log.info('Failed to find further optimization, score %f', + pe.score) + return False + + def get_compat_weight_set_weights(self): + # enable compat weight-set + self.log.debug('ceph osd crush weight-set create-compat') + result = CommandResult('') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd crush weight-set create-compat', + 'format': 'json', + }), '') + r, outb, outs = result.wait() + if r != 0: + self.log.error('Error creating compat weight-set') + return + + result = CommandResult('') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd crush dump', + 'format': 'json', + }), '') + r, outb, outs = result.wait() + if r != 0: + self.log.error('Error dumping crush map') + return + try: + crushmap = json.loads(outb) + except: + raise RuntimeError('unable to parse crush map') + + raw = crushmap.get('choose_args',{}).get('-1', []) + weight_set = {} + for b in raw: + bucket = None + for t in crushmap['buckets']: + if t['id'] == b['bucket_id']: + bucket = t + break + if not bucket: + raise RuntimeError('could not find bucket %s' % b['bucket_id']) + self.log.debug('bucket items %s' % bucket['items']) + self.log.debug('weight set %s' % b['weight_set'][0]) + if len(bucket['items']) != len(b['weight_set'][0]): + raise RuntimeError('weight-set size does not match bucket items') + for pos in range(len(bucket['items'])): + weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos] + + self.log.debug('weight_set weights %s' % weight_set) + return weight_set + + def do_crush(self): + self.log.info('do_crush (not yet implemented)') + + def do_osd_weight(self): + self.log.info('do_osd_weight (not yet implemented)') + + def execute(self, plan): + self.log.info('Executing plan %s' % plan.name) + + commands = [] + + # compat weight-set + if len(plan.compat_ws) and \ + '-1' not in plan.initial.crush_dump.get('choose_args', {}): + self.log.debug('ceph osd crush weight-set create-compat') + result = CommandResult('') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd crush weight-set create-compat', + 'format': 'json', + }), '') + r, outb, outs = result.wait() + if r != 0: + self.log.error('Error creating compat weight-set') + return + + for osd, weight in plan.compat_ws.iteritems(): + self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f', + osd, weight) + result = CommandResult('foo') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd crush weight-set reweight-compat', + 'format': 'json', + 'item': 'osd.%d' % osd, + 'weight': [weight], + }), 'foo') + commands.append(result) + + # new_weight + reweightn = {} + for osd, weight in plan.osd_weights.iteritems(): + reweightn[str(osd)] = str(int(weight * float(0x10000))) + if len(reweightn): + self.log.info('ceph osd reweightn %s', reweightn) + result = CommandResult('foo') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd reweightn', + 'format': 'json', + 'weights': json.dumps(reweightn), + }), 'foo') + commands.append(result) + + # upmap + incdump = plan.inc.dump() + for pgid in incdump.get('old_pg_upmap_items', []): + self.log.info('ceph osd rm-pg-upmap-items %s', pgid) + result = CommandResult('foo') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd rm-pg-upmap-items', + 'format': 'json', + 'pgid': pgid, + }), 'foo') + commands.append(result) + + for item in incdump.get('new_pg_upmap_items', []): + self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'], + item['mappings']) + osdlist = [] + for m in item['mappings']: + osdlist += [m['from'], m['to']] + result = CommandResult('foo') + self.send_command(result, 'mon', '', json.dumps({ + 'prefix': 'osd pg-upmap-items', + 'format': 'json', + 'pgid': item['pgid'], + 'id': osdlist, + }), 'foo') + commands.append(result) + + # wait for commands + self.log.debug('commands %s' % commands) + for result in commands: + r, outb, outs = result.wait() + if r != 0: + self.log.error('Error on command') + return + self.log.debug('done')