Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / scrub.py
1 """
2 Scrub osds
3 """
4 import contextlib
5 import gevent
6 import logging
7 import random
8 import time
9
10 import ceph_manager
11 from teuthology import misc as teuthology
12
13 log = logging.getLogger(__name__)
14
15 @contextlib.contextmanager
16 def task(ctx, config):
17     """
18     Run scrub periodically. Randomly chooses an OSD to scrub.
19
20     The config should be as follows:
21
22     scrub:
23         frequency: <seconds between scrubs>
24         deep: <bool for deepness>
25
26     example:
27
28     tasks:
29     - ceph:
30     - scrub:
31         frequency: 30
32         deep: 0
33     """
34     if config is None:
35         config = {}
36     assert isinstance(config, dict), \
37         'scrub task only accepts a dict for configuration'
38
39     log.info('Beginning scrub...')
40
41     first_mon = teuthology.get_first_mon(ctx, config)
42     (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
43
44     manager = ceph_manager.CephManager(
45         mon,
46         ctx=ctx,
47         logger=log.getChild('ceph_manager'),
48         )
49
50     num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
51     while len(manager.get_osd_status()['up']) < num_osds:
52         time.sleep(10)
53
54     scrub_proc = Scrubber(
55         manager,
56         config,
57         )
58     try:
59         yield
60     finally:
61         log.info('joining scrub')
62         scrub_proc.do_join()
63
64 class Scrubber:
65     """
66     Scrubbing is actually performed during initialzation
67     """
68     def __init__(self, manager, config):
69         """
70         Spawn scrubbing thread upon completion.
71         """
72         self.ceph_manager = manager
73         self.ceph_manager.wait_for_clean()
74
75         osd_status = self.ceph_manager.get_osd_status()
76         self.osds = osd_status['up']
77
78         self.config = config
79         if self.config is None:
80             self.config = dict()
81
82         else:
83             def tmp(x):
84                 """Local display"""
85                 print x
86             self.log = tmp
87
88         self.stopping = False
89
90         log.info("spawning thread")
91
92         self.thread = gevent.spawn(self.do_scrub)
93
94     def do_join(self):
95         """Scrubbing thread finished"""
96         self.stopping = True
97         self.thread.get()
98
99     def do_scrub(self):
100         """Perform the scrub operation"""
101         frequency = self.config.get("frequency", 30)
102         deep = self.config.get("deep", 0)
103
104         log.info("stopping %s" % self.stopping)
105
106         while not self.stopping:
107             osd = str(random.choice(self.osds))
108
109             if deep:
110                 cmd = 'deep-scrub'
111             else:
112                 cmd = 'scrub'
113
114             log.info('%sbing %s' % (cmd, osd))
115             self.ceph_manager.raw_cluster_cmd('osd', cmd, osd)
116
117             time.sleep(frequency)