Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / mgr / balancer / module.py
1
2 """
3 Balance PG distribution across OSDs.
4 """
5
6 import copy
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import MgrModule, CommandResult
13 from threading import Event
14
15 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
16 default_mode = 'none'
17 default_sleep_interval = 60   # seconds
18 default_max_misplaced = .05    # max ratio of pgs replaced at a time
19
20 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
21
22
23 class MappingState:
24     def __init__(self, osdmap, pg_dump, desc=''):
25         self.desc = desc
26         self.osdmap = osdmap
27         self.osdmap_dump = self.osdmap.dump()
28         self.crush = osdmap.get_crush()
29         self.crush_dump = self.crush.dump()
30         self.pg_dump = pg_dump
31         self.pg_stat = {
32             i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
33         }
34         self.poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
35         self.pg_up = {}
36         self.pg_up_by_poolid = {}
37         for poolid in self.poolids:
38             self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
39             for a,b in self.pg_up_by_poolid[poolid].iteritems():
40                 self.pg_up[a] = b
41
42     def calc_misplaced_from(self, other_ms):
43         num = len(other_ms.pg_up)
44         misplaced = 0
45         for pgid, before in other_ms.pg_up.iteritems():
46             if before != self.pg_up.get(pgid, []):
47                 misplaced += 1
48         if num > 0:
49             return float(misplaced) / float(num)
50         return 0.0
51
52 class Plan:
53     def __init__(self, name, ms):
54         self.mode = 'unknown'
55         self.name = name
56         self.initial = ms
57
58         self.osd_weights = {}
59         self.compat_ws = {}
60         self.inc = ms.osdmap.new_incremental()
61
62     def final_state(self):
63         self.inc.set_osd_reweights(self.osd_weights)
64         self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
65         return MappingState(self.initial.osdmap.apply_incremental(self.inc),
66                             self.initial.pg_dump,
67                             'plan %s final' % self.name)
68
69     def dump(self):
70         return json.dumps(self.inc.dump(), indent=4)
71
72     def show(self):
73         ls = []
74         ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
75         ls.append('# starting crush version %d' %
76                   self.initial.osdmap.get_crush_version())
77         ls.append('# mode %s' % self.mode)
78         if len(self.compat_ws) and \
79            '-1' not in self.initial.crush_dump.get('choose_args', {}):
80             ls.append('ceph osd crush weight-set create-compat')
81         for osd, weight in self.compat_ws.iteritems():
82             ls.append('ceph osd crush weight-set reweight-compat %s %f' %
83                       (osd, weight))
84         for osd, weight in self.osd_weights.iteritems():
85             ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
86         incdump = self.inc.dump()
87         for pgid in incdump.get('old_pg_upmap_items', []):
88             ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
89         for item in incdump.get('new_pg_upmap_items', []):
90             osdlist = []
91             for m in item['mappings']:
92                 osdlist += [m['from'], m['to']]
93             ls.append('ceph osd pg-upmap-items %s %s' %
94                       (item['pgid'], ' '.join([str(a) for a in osdlist])))
95         return '\n'.join(ls)
96
97
98 class Eval:
99     root_ids = {}        # root name -> id
100     pool_name = {}       # pool id -> pool name
101     pool_id = {}         # pool name -> id
102     pool_roots = {}      # pool name -> root name
103     root_pools = {}      # root name -> pools
104     target_by_root = {}  # root name -> target weight map
105     count_by_pool = {}
106     count_by_root = {}
107     actual_by_pool = {}  # pool -> by_* -> actual weight map
108     actual_by_root = {}  # pool -> by_* -> actual weight map
109     total_by_pool = {}   # pool -> by_* -> total
110     total_by_root = {}   # root -> by_* -> total
111     stats_by_pool = {}   # pool -> by_* -> stddev or avg -> value
112     stats_by_root = {}   # root -> by_* -> stddev or avg -> value
113
114     score_by_pool = {}
115     score_by_root = {}
116
117     score = 0.0
118
119     def __init__(self, ms):
120         self.ms = ms
121
122     def show(self, verbose=False):
123         if verbose:
124             r = self.ms.desc + '\n'
125             r += 'target_by_root %s\n' % self.target_by_root
126             r += 'actual_by_pool %s\n' % self.actual_by_pool
127             r += 'actual_by_root %s\n' % self.actual_by_root
128             r += 'count_by_pool %s\n' % self.count_by_pool
129             r += 'count_by_root %s\n' % self.count_by_root
130             r += 'total_by_pool %s\n' % self.total_by_pool
131             r += 'total_by_root %s\n' % self.total_by_root
132             r += 'stats_by_root %s\n' % self.stats_by_root
133             r += 'score_by_pool %s\n' % self.score_by_pool
134             r += 'score_by_root %s\n' % self.score_by_root
135         else:
136             r = self.ms.desc + ' '
137         r += 'score %f (lower is better)\n' % self.score
138         return r
139
140     def calc_stats(self, count, target, total):
141         num = max(len(target), 1)
142         r = {}
143         for t in ('pgs', 'objects', 'bytes'):
144             avg = float(total[t]) / float(num)
145             dev = 0.0
146
147             # score is a measure of how uneven the data distribution is.
148             # score lies between [0, 1), 0 means perfect distribution.
149             score = 0.0
150             sum_weight = 0.0
151
152             for k, v in count[t].iteritems():
153                 # adjust/normalize by weight
154                 if target[k]:
155                     adjusted = float(v) / target[k] / float(num)
156                 else:
157                     adjusted = 0.0
158
159                 # Overweighted devices and their weights are factors to calculate reweight_urgency.
160                 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
161                 # situation than one 10% overfilled with 5 2% underfilled devices
162                 if adjusted > avg:
163                     '''
164                     F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
165                     x = (adjusted - avg)/avg.
166                     Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
167                     To bring range of F(x) in range [0, 1), we need to make the above modification.
168
169                     In general, we need to use a function F(x), where x = (adjusted - avg)/avg
170                     1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
171                     2. A larger value of x, should imply more urgency to reweight.
172                     3. Also, the difference between F(x) when x is large, should be minimal.
173                     4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
174
175                     Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
176
177                     cdf of standard normal distribution: https://stackoverflow.com/a/29273201
178                     '''
179                     score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
180                     sum_weight += target[k]
181                 dev += (avg - adjusted) * (avg - adjusted)
182             stddev = math.sqrt(dev / float(max(num - 1, 1)))
183             score = score / max(sum_weight, 1)
184             r[t] = {
185                 'avg': avg,
186                 'stddev': stddev,
187                 'sum_weight': sum_weight,
188                 'score': score,
189             }
190         return r
191
192 class Module(MgrModule):
193     COMMANDS = [
194         {
195             "cmd": "balancer status",
196             "desc": "Show balancer status",
197             "perm": "r",
198         },
199         {
200             "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
201             "desc": "Set balancer mode",
202             "perm": "rw",
203         },
204         {
205             "cmd": "balancer on",
206             "desc": "Enable automatic balancing",
207             "perm": "rw",
208         },
209         {
210             "cmd": "balancer off",
211             "desc": "Disable automatic balancing",
212             "perm": "rw",
213         },
214         {
215             "cmd": "balancer eval name=plan,type=CephString,req=false",
216             "desc": "Evaluate data distribution for the current cluster or specific plan",
217             "perm": "r",
218         },
219         {
220             "cmd": "balancer eval-verbose name=plan,type=CephString,req=false",
221             "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)",
222             "perm": "r",
223         },
224         {
225             "cmd": "balancer optimize name=plan,type=CephString",
226             "desc": "Run optimizer to create a new plan",
227             "perm": "rw",
228         },
229         {
230             "cmd": "balancer show name=plan,type=CephString",
231             "desc": "Show details of an optimization plan",
232             "perm": "r",
233         },
234         {
235             "cmd": "balancer rm name=plan,type=CephString",
236             "desc": "Discard an optimization plan",
237             "perm": "rw",
238         },
239         {
240             "cmd": "balancer reset",
241             "desc": "Discard all optimization plans",
242             "perm": "rw",
243         },
244         {
245             "cmd": "balancer dump name=plan,type=CephString",
246             "desc": "Show an optimization plan",
247             "perm": "r",
248         },
249         {
250             "cmd": "balancer execute name=plan,type=CephString",
251             "desc": "Execute an optimization plan",
252             "perm": "r",
253         },
254     ]
255     active = False
256     run = True
257     plans = {}
258     mode = ''
259
260     def __init__(self, *args, **kwargs):
261         super(Module, self).__init__(*args, **kwargs)
262         self.event = Event()
263
264     def handle_command(self, command):
265         self.log.warn("Handling command: '%s'" % str(command))
266         if command['prefix'] == 'balancer status':
267             s = {
268                 'plans': self.plans.keys(),
269                 'active': self.active,
270                 'mode': self.get_config('mode', default_mode),
271             }
272             return (0, json.dumps(s, indent=4), '')
273         elif command['prefix'] == 'balancer mode':
274             self.set_config('mode', command['mode'])
275             return (0, '', '')
276         elif command['prefix'] == 'balancer on':
277             if not self.active:
278                 self.set_config('active', '1')
279                 self.active = True
280             self.event.set()
281             return (0, '', '')
282         elif command['prefix'] == 'balancer off':
283             if self.active:
284                 self.set_config('active', '')
285                 self.active = False
286             self.event.set()
287             return (0, '', '')
288         elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
289             verbose = command['prefix'] == 'balancer eval-verbose'
290             if 'plan' in command:
291                 plan = self.plans.get(command['plan'])
292                 if not plan:
293                     return (-errno.ENOENT, '', 'plan %s not found' %
294                             command['plan'])
295                 ms = plan.final_state()
296             else:
297                 ms = MappingState(self.get_osdmap(),
298                                   self.get("pg_dump"),
299                                   'current cluster')
300             return (0, self.evaluate(ms, verbose=verbose), '')
301         elif command['prefix'] == 'balancer optimize':
302             plan = self.plan_create(command['plan'])
303             self.optimize(plan)
304             return (0, '', '')
305         elif command['prefix'] == 'balancer rm':
306             self.plan_rm(command['name'])
307             return (0, '', '')
308         elif command['prefix'] == 'balancer reset':
309             self.plans = {}
310             return (0, '', '')
311         elif command['prefix'] == 'balancer dump':
312             plan = self.plans.get(command['plan'])
313             if not plan:
314                 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
315             return (0, plan.dump(), '')
316         elif command['prefix'] == 'balancer show':
317             plan = self.plans.get(command['plan'])
318             if not plan:
319                 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
320             return (0, plan.show(), '')
321         elif command['prefix'] == 'balancer execute':
322             plan = self.plans.get(command['plan'])
323             if not plan:
324                 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
325             self.execute(plan)
326             self.plan_rm(plan)
327             return (0, '', '')
328         else:
329             return (-errno.EINVAL, '',
330                     "Command not found '{0}'".format(command['prefix']))
331
332     def shutdown(self):
333         self.log.info('Stopping')
334         self.run = False
335         self.event.set()
336
337     def time_in_interval(self, tod, begin, end):
338         if begin <= end:
339             return tod >= begin and tod < end
340         else:
341             return tod >= begin or tod < end
342
343     def serve(self):
344         self.log.info('Starting')
345         while self.run:
346             self.active = self.get_config('active', '') is not ''
347             begin_time = self.get_config('begin_time') or '0000'
348             end_time = self.get_config('end_time') or '2400'
349             timeofday = time.strftime('%H%M', time.localtime())
350             self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]',
351                            "active" if self.active else "inactive",
352                            begin_time, end_time, timeofday)
353             sleep_interval = float(self.get_config('sleep_interval',
354                                                    default_sleep_interval))
355             if self.active and self.time_in_interval(timeofday, begin_time, end_time):
356                 self.log.debug('Running')
357                 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
358                 plan = self.plan_create(name)
359                 if self.optimize(plan):
360                     self.execute(plan)
361                 self.plan_rm(name)
362             self.log.debug('Sleeping for %d', sleep_interval)
363             self.event.wait(sleep_interval)
364             self.event.clear()
365
366     def plan_create(self, name):
367         plan = Plan(name, MappingState(self.get_osdmap(),
368                                        self.get("pg_dump"),
369                                        'plan %s initial' % name))
370         self.plans[name] = plan
371         return plan
372
373     def plan_rm(self, name):
374         if name in self.plans:
375             del self.plans[name]
376
377     def calc_eval(self, ms):
378         pe = Eval(ms)
379         pool_rule = {}
380         pool_info = {}
381         for p in ms.osdmap_dump.get('pools',[]):
382             pe.pool_name[p['pool']] = p['pool_name']
383             pe.pool_id[p['pool_name']] = p['pool']
384             pool_rule[p['pool_name']] = p['crush_rule']
385             pe.pool_roots[p['pool_name']] = []
386             pool_info[p['pool_name']] = p
387         pools = pe.pool_id.keys()
388         if len(pools) == 0:
389             return pe
390         self.log.debug('pool_name %s' % pe.pool_name)
391         self.log.debug('pool_id %s' % pe.pool_id)
392         self.log.debug('pools %s' % pools)
393         self.log.debug('pool_rule %s' % pool_rule)
394
395         osd_weight = { a['osd']: a['weight']
396                        for a in ms.osdmap_dump.get('osds',[]) }
397
398         # get expected distributions by root
399         actual_by_root = {}
400         rootids = ms.crush.find_takes()
401         roots = []
402         for rootid in rootids:
403             root = ms.crush.get_item_name(rootid)
404             pe.root_ids[root] = rootid
405             roots.append(root)
406             ls = ms.osdmap.get_pools_by_take(rootid)
407             pe.root_pools[root] = []
408             for poolid in ls:
409                 pe.pool_roots[pe.pool_name[poolid]].append(root)
410                 pe.root_pools[root].append(pe.pool_name[poolid])
411             weight_map = ms.crush.get_take_weight_osd_map(rootid)
412             adjusted_map = {
413                 osd: cw * osd_weight.get(osd, 1.0)
414                 for osd,cw in weight_map.iteritems()
415             }
416             sum_w = sum(adjusted_map.values()) or 1.0
417             pe.target_by_root[root] = { osd: w / sum_w
418                                         for osd,w in adjusted_map.iteritems() }
419             actual_by_root[root] = {
420                 'pgs': {},
421                 'objects': {},
422                 'bytes': {},
423             }
424             for osd in pe.target_by_root[root].iterkeys():
425                 actual_by_root[root]['pgs'][osd] = 0
426                 actual_by_root[root]['objects'][osd] = 0
427                 actual_by_root[root]['bytes'][osd] = 0
428             pe.total_by_root[root] = {
429                 'pgs': 0,
430                 'objects': 0,
431                 'bytes': 0,
432             }
433         self.log.debug('pool_roots %s' % pe.pool_roots)
434         self.log.debug('root_pools %s' % pe.root_pools)
435         self.log.debug('target_by_root %s' % pe.target_by_root)
436
437         # pool and root actual
438         for pool, pi in pool_info.iteritems():
439             poolid = pi['pool']
440             pm = ms.pg_up_by_poolid[poolid]
441             pgs = 0
442             objects = 0
443             bytes = 0
444             pgs_by_osd = {}
445             objects_by_osd = {}
446             bytes_by_osd = {}
447             for root in pe.pool_roots[pool]:
448                 for osd in pe.target_by_root[root].iterkeys():
449                     pgs_by_osd[osd] = 0
450                     objects_by_osd[osd] = 0
451                     bytes_by_osd[osd] = 0
452             for pgid, up in pm.iteritems():
453                 for osd in [int(osd) for osd in up]:
454                     pgs_by_osd[osd] += 1
455                     objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
456                     bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
457                     # pick a root to associate this pg instance with.
458                     # note that this is imprecise if the roots have
459                     # overlapping children.
460                     # FIXME: divide bytes by k for EC pools.
461                     for root in pe.pool_roots[pool]:
462                         if osd in pe.target_by_root[root]:
463                             actual_by_root[root]['pgs'][osd] += 1
464                             actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
465                             actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
466                             pgs += 1
467                             objects += ms.pg_stat[pgid]['num_objects']
468                             bytes += ms.pg_stat[pgid]['num_bytes']
469                             pe.total_by_root[root]['pgs'] += 1
470                             pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
471                             pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
472                             break
473             pe.count_by_pool[pool] = {
474                 'pgs': {
475                     k: v
476                     for k, v in pgs_by_osd.iteritems()
477                 },
478                 'objects': {
479                     k: v
480                     for k, v in objects_by_osd.iteritems()
481                 },
482                 'bytes': {
483                     k: v
484                     for k, v in bytes_by_osd.iteritems()
485                 },
486             }
487             pe.actual_by_pool[pool] = {
488                 'pgs': {
489                     k: float(v) / float(max(pgs, 1))
490                     for k, v in pgs_by_osd.iteritems()
491                 },
492                 'objects': {
493                     k: float(v) / float(max(objects, 1))
494                     for k, v in objects_by_osd.iteritems()
495                 },
496                 'bytes': {
497                     k: float(v) / float(max(bytes, 1))
498                     for k, v in bytes_by_osd.iteritems()
499                 },
500             }
501             pe.total_by_pool[pool] = {
502                 'pgs': pgs,
503                 'objects': objects,
504                 'bytes': bytes,
505             }
506         for root, m in pe.total_by_root.iteritems():
507             pe.count_by_root[root] = {
508                 'pgs': {
509                     k: float(v)
510                     for k, v in actual_by_root[root]['pgs'].iteritems()
511                 },
512                 'objects': {
513                     k: float(v)
514                     for k, v in actual_by_root[root]['objects'].iteritems()
515                 },
516                 'bytes': {
517                     k: float(v)
518                     for k, v in actual_by_root[root]['bytes'].iteritems()
519                 },
520             }
521             pe.actual_by_root[root] = {
522                 'pgs': {
523                     k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
524                     for k, v in actual_by_root[root]['pgs'].iteritems()
525                 },
526                 'objects': {
527                     k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
528                     for k, v in actual_by_root[root]['objects'].iteritems()
529                 },
530                 'bytes': {
531                     k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
532                     for k, v in actual_by_root[root]['bytes'].iteritems()
533                 },
534             }
535         self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
536         self.log.debug('actual_by_root %s' % pe.actual_by_root)
537
538         # average and stddev and score
539         pe.stats_by_root = {
540             a: pe.calc_stats(
541                 b,
542                 pe.target_by_root[a],
543                 pe.total_by_root[a]
544             ) for a, b in pe.count_by_root.iteritems()
545         }
546
547         # the scores are already normalized
548         pe.score_by_root = {
549             r: {
550                 'pgs': pe.stats_by_root[r]['pgs']['score'],
551                 'objects': pe.stats_by_root[r]['objects']['score'],
552                 'bytes': pe.stats_by_root[r]['bytes']['score'],
553             } for r in pe.total_by_root.keys()
554         }
555
556         # total score is just average of normalized stddevs
557         pe.score = 0.0
558         for r, vs in pe.score_by_root.iteritems():
559             for k, v in vs.iteritems():
560                 pe.score += v
561         pe.score /= 3 * len(roots)
562         return pe
563
564     def evaluate(self, ms, verbose=False):
565         pe = self.calc_eval(ms)
566         return pe.show(verbose=verbose)
567
568     def optimize(self, plan):
569         self.log.info('Optimize plan %s' % plan.name)
570         plan.mode = self.get_config('mode', default_mode)
571         max_misplaced = float(self.get_config('max_misplaced',
572                                               default_max_misplaced))
573         self.log.info('Mode %s, max misplaced %f' %
574                       (plan.mode, max_misplaced))
575
576         info = self.get('pg_status')
577         unknown = info.get('unknown_pgs_ratio', 0.0)
578         degraded = info.get('degraded_ratio', 0.0)
579         inactive = info.get('inactive_pgs_ratio', 0.0)
580         misplaced = info.get('misplaced_ratio', 0.0)
581         self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
582                        unknown, degraded, inactive, misplaced)
583         if unknown > 0.0:
584             self.log.info('Some PGs (%f) are unknown; waiting', unknown)
585         elif degraded > 0.0:
586             self.log.info('Some objects (%f) are degraded; waiting', degraded)
587         elif inactive > 0.0:
588             self.log.info('Some PGs (%f) are inactive; waiting', inactive)
589         elif misplaced >= max_misplaced:
590             self.log.info('Too many objects (%f > %f) are misplaced; waiting',
591                           misplaced, max_misplaced)
592         else:
593             if plan.mode == 'upmap':
594                 return self.do_upmap(plan)
595             elif plan.mode == 'crush-compat':
596                 return self.do_crush_compat(plan)
597             elif plan.mode == 'none':
598                 self.log.info('Idle')
599             else:
600                 self.log.info('Unrecognized mode %s' % plan.mode)
601         return False
602
603         ##
604
605     def do_upmap(self, plan):
606         self.log.info('do_upmap')
607         max_iterations = self.get_config('upmap_max_iterations', 10)
608         max_deviation = self.get_config('upmap_max_deviation', .01)
609
610         ms = plan.initial
611         pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
612         if len(pools) == 0:
613             self.log.info('no pools, nothing to do')
614             return False
615         # shuffle pool list so they all get equal (in)attention
616         random.shuffle(pools)
617         self.log.info('pools %s' % pools)
618
619         inc = plan.inc
620         total_did = 0
621         left = max_iterations
622         for pool in pools:
623             did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
624             total_did += did
625             left -= did
626             if left <= 0:
627                 break
628         self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
629         return True
630
631     def do_crush_compat(self, plan):
632         self.log.info('do_crush_compat')
633         max_iterations = self.get_config('crush_compat_max_iterations', 25)
634         if max_iterations < 1:
635             return False
636         step = self.get_config('crush_compat_step', .5)
637         if step <= 0 or step >= 1.0:
638             return False
639         max_misplaced = float(self.get_config('max_misplaced',
640                                               default_max_misplaced))
641         min_pg_per_osd = 2
642
643         ms = plan.initial
644         osdmap = ms.osdmap
645         crush = osdmap.get_crush()
646         pe = self.calc_eval(ms)
647         if pe.score == 0:
648             self.log.info('Distribution is already perfect')
649             return False
650
651         # get current osd reweights
652         orig_osd_weight = { a['osd']: a['weight']
653                             for a in ms.osdmap_dump.get('osds',[]) }
654         reweighted_osds = [ a for a,b in orig_osd_weight.iteritems()
655                             if b < 1.0 and b > 0.0 ]
656
657         # get current compat weight-set weights
658         orig_ws = self.get_compat_weight_set_weights()
659         orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 }
660
661         # Make sure roots don't overlap their devices.  If so, we
662         # can't proceed.
663         roots = pe.target_by_root.keys()
664         self.log.debug('roots %s', roots)
665         visited = {}
666         overlap = {}
667         root_ids = {}
668         for root, wm in pe.target_by_root.iteritems():
669             for osd in wm.iterkeys():
670                 if osd in visited:
671                     overlap[osd] = 1
672                 visited[osd] = 1
673         if len(overlap) > 0:
674             self.log.err('error: some osds belong to multiple subtrees: %s' %
675                          overlap)
676             return False
677
678         key = 'pgs'  # pgs objects or bytes
679
680         # go
681         best_ws = copy.deepcopy(orig_ws)
682         best_ow = copy.deepcopy(orig_osd_weight)
683         best_pe = pe
684         left = max_iterations
685         bad_steps = 0
686         next_ws = copy.deepcopy(best_ws)
687         next_ow = copy.deepcopy(best_ow)
688         while left > 0:
689             # adjust
690             self.log.debug('best_ws %s' % best_ws)
691             random.shuffle(roots)
692             for root in roots:
693                 pools = best_pe.root_pools[root]
694                 pgs = len(best_pe.target_by_root[root])
695                 min_pgs = pgs * min_pg_per_osd
696                 if best_pe.total_by_root[root] < min_pgs:
697                     self.log.info('Skipping root %s (pools %s), total pgs %d '
698                                   '< minimum %d (%d per osd)',
699                                   root, pools, pgs, min_pgs, min_pg_per_osd)
700                     continue
701                 self.log.info('Balancing root %s (pools %s) by %s' %
702                               (root, pools, key))
703                 target = best_pe.target_by_root[root]
704                 actual = best_pe.actual_by_root[root][key]
705                 queue = sorted(actual.keys(),
706                                key=lambda osd: -abs(target[osd] - actual[osd]))
707                 for osd in queue:
708                     if orig_osd_weight[osd] == 0:
709                         self.log.debug('skipping out osd.%d', osd)
710                     else:
711                         deviation = target[osd] - actual[osd]
712                         if deviation == 0:
713                             break
714                         self.log.debug('osd.%d deviation %f', osd, deviation)
715                         weight = best_ws[osd]
716                         ow = orig_osd_weight[osd]
717                         if actual[osd] > 0:
718                             calc_weight = target[osd] / actual[osd] * weight * ow
719                         else:
720                             # not enough to go on here... keep orig weight
721                             calc_weight = weight / orig_osd_weight[osd]
722                         new_weight = weight * (1.0 - step) + calc_weight * step
723                         self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
724                                        new_weight)
725                         next_ws[osd] = new_weight
726                         if ow < 1.0:
727                             new_ow = min(1.0, max(step + (1.0 - step) * ow,
728                                                   ow + .005))
729                             self.log.debug('Reweight osd.%d reweight %f -> %f',
730                                            osd, ow, new_ow)
731                             next_ow[osd] = new_ow
732
733                 # normalize weights under this root
734                 root_weight = crush.get_item_weight(pe.root_ids[root])
735                 root_sum = sum(b for a,b in next_ws.iteritems()
736                                if a in target.keys())
737                 if root_sum > 0 and root_weight > 0:
738                     factor = root_sum / root_weight
739                     self.log.debug('normalizing root %s %d, weight %f, '
740                                    'ws sum %f, factor %f',
741                                    root, pe.root_ids[root], root_weight,
742                                    root_sum, factor)
743                     for osd in actual.keys():
744                         next_ws[osd] = next_ws[osd] / factor
745
746             # recalc
747             plan.compat_ws = copy.deepcopy(next_ws)
748             next_ms = plan.final_state()
749             next_pe = self.calc_eval(next_ms)
750             next_misplaced = next_ms.calc_misplaced_from(ms)
751             self.log.debug('Step result score %f -> %f, misplacing %f',
752                            best_pe.score, next_pe.score, next_misplaced)
753
754             if next_misplaced > max_misplaced:
755                 if best_pe.score < pe.score:
756                     self.log.debug('Step misplaced %f > max %f, stopping',
757                                    next_misplaced, max_misplaced)
758                     break
759                 step /= 2.0
760                 next_ws = copy.deepcopy(best_ws)
761                 next_ow = copy.deepcopy(best_ow)
762                 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
763                                next_misplaced, max_misplaced, step)
764             else:
765                 if next_pe.score > best_pe.score * 1.0001:
766                     if bad_steps < 5 and random.randint(0, 100) < 70:
767                         self.log.debug('Score got worse, taking another step')
768                     else:
769                         step /= 2.0
770                         next_ws = copy.deepcopy(best_ws)
771                         next_ow = copy.deepcopy(best_ow)
772                         self.log.debug('Score got worse, trying smaller step %f',
773                                        step)
774                 else:
775                     bad_steps = 0
776                     best_pe = next_pe
777                     best_ws = next_ws
778                     best_ow = next_ow
779                     if best_pe.score == 0:
780                         break
781             left -= 1
782
783         # allow a small regression if we are phasing out osd weights
784         fudge = 0
785         if next_ow != orig_osd_weight:
786             fudge = .001
787
788         if best_pe.score < pe.score + fudge:
789             self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
790             plan.compat_ws = best_ws
791             for osd, w in best_ow.iteritems():
792                 if w != orig_osd_weight[osd]:
793                     self.log.debug('osd.%d reweight %f', osd, w)
794                     plan.osd_weights[osd] = w
795             return True
796         else:
797             self.log.info('Failed to find further optimization, score %f',
798                           pe.score)
799             return False
800
801     def get_compat_weight_set_weights(self):
802         # enable compat weight-set
803         self.log.debug('ceph osd crush weight-set create-compat')
804         result = CommandResult('')
805         self.send_command(result, 'mon', '', json.dumps({
806             'prefix': 'osd crush weight-set create-compat',
807             'format': 'json',
808         }), '')
809         r, outb, outs = result.wait()
810         if r != 0:
811             self.log.error('Error creating compat weight-set')
812             return
813
814         result = CommandResult('')
815         self.send_command(result, 'mon', '', json.dumps({
816             'prefix': 'osd crush dump',
817             'format': 'json',
818         }), '')
819         r, outb, outs = result.wait()
820         if r != 0:
821             self.log.error('Error dumping crush map')
822             return
823         try:
824             crushmap = json.loads(outb)
825         except:
826             raise RuntimeError('unable to parse crush map')
827
828         raw = crushmap.get('choose_args',{}).get('-1', [])
829         weight_set = {}
830         for b in raw:
831             bucket = None
832             for t in crushmap['buckets']:
833                 if t['id'] == b['bucket_id']:
834                     bucket = t
835                     break
836             if not bucket:
837                 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
838             self.log.debug('bucket items %s' % bucket['items'])
839             self.log.debug('weight set %s' % b['weight_set'][0])
840             if len(bucket['items']) != len(b['weight_set'][0]):
841                 raise RuntimeError('weight-set size does not match bucket items')
842             for pos in range(len(bucket['items'])):
843                 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
844
845         self.log.debug('weight_set weights %s' % weight_set)
846         return weight_set
847
848     def do_crush(self):
849         self.log.info('do_crush (not yet implemented)')
850
851     def do_osd_weight(self):
852         self.log.info('do_osd_weight (not yet implemented)')
853
854     def execute(self, plan):
855         self.log.info('Executing plan %s' % plan.name)
856
857         commands = []
858
859         # compat weight-set
860         if len(plan.compat_ws) and \
861            '-1' not in plan.initial.crush_dump.get('choose_args', {}):
862             self.log.debug('ceph osd crush weight-set create-compat')
863             result = CommandResult('')
864             self.send_command(result, 'mon', '', json.dumps({
865                 'prefix': 'osd crush weight-set create-compat',
866                 'format': 'json',
867             }), '')
868             r, outb, outs = result.wait()
869             if r != 0:
870                 self.log.error('Error creating compat weight-set')
871                 return
872
873         for osd, weight in plan.compat_ws.iteritems():
874             self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
875                           osd, weight)
876             result = CommandResult('foo')
877             self.send_command(result, 'mon', '', json.dumps({
878                 'prefix': 'osd crush weight-set reweight-compat',
879                 'format': 'json',
880                 'item': 'osd.%d' % osd,
881                 'weight': [weight],
882             }), 'foo')
883             commands.append(result)
884
885         # new_weight
886         reweightn = {}
887         for osd, weight in plan.osd_weights.iteritems():
888             reweightn[str(osd)] = str(int(weight * float(0x10000)))
889         if len(reweightn):
890             self.log.info('ceph osd reweightn %s', reweightn)
891             result = CommandResult('foo')
892             self.send_command(result, 'mon', '', json.dumps({
893                 'prefix': 'osd reweightn',
894                 'format': 'json',
895                 'weights': json.dumps(reweightn),
896             }), 'foo')
897             commands.append(result)
898
899         # upmap
900         incdump = plan.inc.dump()
901         for pgid in incdump.get('old_pg_upmap_items', []):
902             self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
903             result = CommandResult('foo')
904             self.send_command(result, 'mon', '', json.dumps({
905                 'prefix': 'osd rm-pg-upmap-items',
906                 'format': 'json',
907                 'pgid': pgid,
908             }), 'foo')
909             commands.append(result)
910
911         for item in incdump.get('new_pg_upmap_items', []):
912             self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
913                           item['mappings'])
914             osdlist = []
915             for m in item['mappings']:
916                 osdlist += [m['from'], m['to']]
917             result = CommandResult('foo')
918             self.send_command(result, 'mon', '', json.dumps({
919                 'prefix': 'osd pg-upmap-items',
920                 'format': 'json',
921                 'pgid': item['pgid'],
922                 'id': osdlist,
923             }), 'foo')
924             commands.append(result)
925
926         # wait for commands
927         self.log.debug('commands %s' % commands)
928         for result in commands:
929             r, outb, outs = result.wait()
930             if r != 0:
931                 self.log.error('Error on command')
932                 return
933         self.log.debug('done')