Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / qa / tasks / workunit.py
1 """
2 Workunit task -- Run ceph on sets of specific clients
3 """
4 import logging
5 import pipes
6 import os
7 import re
8
9 from copy import deepcopy
10 from util import get_remote_for_role
11
12 from teuthology import misc
13 from teuthology.config import config as teuth_config
14 from teuthology.orchestra.run import CommandFailedError
15 from teuthology.parallel import parallel
16 from teuthology.orchestra import run
17
18 log = logging.getLogger(__name__)
19
20
21 class Refspec:
22     def __init__(self, refspec):
23         self.refspec = refspec
24
25     def __str__(self):
26         return self.refspec
27
28     def _clone(self, git_url, clonedir, opts=None):
29         if opts is None:
30             opts = []
31         return (['rm', '-rf', clonedir] +
32                 [run.Raw('&&')] +
33                 ['git', 'clone'] + opts +
34                 [git_url, clonedir])
35
36     def _cd(self, clonedir):
37         return ['cd', clonedir]
38
39     def _checkout(self):
40         return ['git', 'checkout', self.refspec]
41
42     def clone(self, git_url, clonedir):
43         return (self._clone(git_url, clonedir) +
44                 [run.Raw('&&')] +
45                 self._cd(clonedir) +
46                 [run.Raw('&&')] +
47                 self._checkout())
48
49
50 class Branch(Refspec):
51     def __init__(self, tag):
52         Refspec.__init__(self, tag)
53
54     def clone(self, git_url, clonedir):
55         opts = ['--depth', '1',
56                 '--branch', self.refspec]
57         return (self._clone(git_url, clonedir, opts) +
58                 [run.Raw('&&')] +
59                 self._cd(clonedir))
60
61
62 class Head(Refspec):
63     def __init__(self):
64         Refspec.__init__(self, 'HEAD')
65
66     def clone(self, git_url, clonedir):
67         opts = ['--depth', '1']
68         return (self._clone(git_url, clonedir, opts) +
69                 [run.Raw('&&')] +
70                 self._cd(clonedir))
71
72
73 def task(ctx, config):
74     """
75     Run ceph on all workunits found under the specified path.
76
77     For example::
78
79         tasks:
80         - ceph:
81         - ceph-fuse: [client.0]
82         - workunit:
83             clients:
84               client.0: [direct_io, xattrs.sh]
85               client.1: [snaps]
86             branch: foo
87
88     You can also run a list of workunits on all clients:
89         tasks:
90         - ceph:
91         - ceph-fuse:
92         - workunit:
93             tag: v0.47
94             clients:
95               all: [direct_io, xattrs.sh, snaps]
96
97     If you have an "all" section it will run all the workunits
98     on each client simultaneously, AFTER running any workunits specified
99     for individual clients. (This prevents unintended simultaneous runs.)
100
101     To customize tests, you can specify environment variables as a dict. You
102     can also specify a time limit for each work unit (defaults to 3h):
103
104         tasks:
105         - ceph:
106         - ceph-fuse:
107         - workunit:
108             sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
109             clients:
110               all: [snaps]
111             env:
112               FOO: bar
113               BAZ: quux
114             timeout: 3h
115
116     This task supports roles that include a ceph cluster, e.g.::
117
118         tasks:
119         - ceph:
120         - workunit:
121             clients:
122               backup.client.0: [foo]
123               client.1: [bar] # cluster is implicitly 'ceph'
124
125     You can also specify an alternative top-level dir to 'qa/workunits', like
126     'qa/standalone', with::
127
128         tasks:
129         - install:
130         - workunit:
131             basedir: qa/standalone
132             clients:
133               client.0:
134                 - test-ceph-helpers.sh
135
136     :param ctx: Context
137     :param config: Configuration
138     """
139     assert isinstance(config, dict)
140     assert isinstance(config.get('clients'), dict), \
141         'configuration must contain a dictionary of clients'
142
143     # mimic the behavior of the "install" task, where the "overrides" are
144     # actually the defaults of that task. in other words, if none of "sha1",
145     # "tag", or "branch" is specified by a "workunit" tasks, we will update
146     # it with the information in the "workunit" sub-task nested in "overrides".
147     overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {}))
148     refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec}
149     if any(map(lambda i: i in config, refspecs.iterkeys())):
150         for i in refspecs.iterkeys():
151             overrides.pop(i, None)
152     misc.deep_merge(config, overrides)
153
154     for spec, cls in refspecs.iteritems():
155         refspec = config.get(spec)
156         if refspec:
157             refspec = cls(refspec)
158             break
159     if refspec is None:
160         refspec = Head()
161
162     timeout = config.get('timeout', '3h')
163
164     log.info('Pulling workunits from ref %s', refspec)
165
166     created_mountpoint = {}
167
168     if config.get('env') is not None:
169         assert isinstance(config['env'], dict), 'env must be a dictionary'
170     clients = config['clients']
171
172     # Create scratch dirs for any non-all workunits
173     log.info('Making a separate scratch dir for every client...')
174     for role in clients.iterkeys():
175         assert isinstance(role, basestring)
176         if role == "all":
177             continue
178
179         assert 'client' in role
180         created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
181         created_mountpoint[role] = created_mnt_dir
182
183     # Execute any non-all workunits
184     with parallel() as p:
185         for role, tests in clients.iteritems():
186             if role != "all":
187                 p.spawn(_run_tests, ctx, refspec, role, tests,
188                         config.get('env'),
189                         basedir=config.get('basedir','qa/workunits'),
190                         timeout=timeout)
191
192     # Clean up dirs from any non-all workunits
193     for role, created in created_mountpoint.items():
194         _delete_dir(ctx, role, created)
195
196     # Execute any 'all' workunits
197     if 'all' in clients:
198         all_tasks = clients["all"]
199         _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
200                               config.get('basedir', 'qa/workunits'),
201                               config.get('subdir'), timeout=timeout)
202
203
204 def _client_mountpoint(ctx, cluster, id_):
205     """
206     Returns the path to the expected mountpoint for workunits running
207     on some kind of filesystem.
208     """
209     # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
210     # only include the cluster name in the dir if the cluster is not 'ceph'
211     if cluster == 'ceph':
212         dir_ = 'mnt.{0}'.format(id_)
213     else:
214         dir_ = 'mnt.{0}.{1}'.format(cluster, id_)
215     return os.path.join(misc.get_testdir(ctx), dir_)
216
217
218 def _delete_dir(ctx, role, created_mountpoint):
219     """
220     Delete file used by this role, and delete the directory that this
221     role appeared in.
222
223     :param ctx: Context
224     :param role: "role.#" where # is used for the role id.
225     """
226     cluster, _, id_ = misc.split_role(role)
227     remote = get_remote_for_role(ctx, role)
228     mnt = _client_mountpoint(ctx, cluster, id_)
229     client = os.path.join(mnt, 'client.{id}'.format(id=id_))
230
231     # Remove the directory inside the mount where the workunit ran
232     remote.run(
233         args=[
234             'sudo',
235             'rm',
236             '-rf',
237             '--',
238             client,
239         ],
240     )
241     log.info("Deleted dir {dir}".format(dir=client))
242
243     # If the mount was an artificially created dir, delete that too
244     if created_mountpoint:
245         remote.run(
246             args=[
247                 'rmdir',
248                 '--',
249                 mnt,
250             ],
251         )
252         log.info("Deleted artificial mount point {dir}".format(dir=client))
253
254
255 def _make_scratch_dir(ctx, role, subdir):
256     """
257     Make scratch directories for this role.  This also makes the mount
258     point if that directory does not exist.
259
260     :param ctx: Context
261     :param role: "role.#" where # is used for the role id.
262     :param subdir: use this subdir (False if not used)
263     """
264     created_mountpoint = False
265     cluster, _, id_ = misc.split_role(role)
266     remote = get_remote_for_role(ctx, role)
267     dir_owner = remote.user
268     mnt = _client_mountpoint(ctx, cluster, id_)
269     # if neither kclient nor ceph-fuse are required for a workunit,
270     # mnt may not exist. Stat and create the directory if it doesn't.
271     try:
272         remote.run(
273             args=[
274                 'stat',
275                 '--',
276                 mnt,
277             ],
278         )
279         log.info('Did not need to create dir {dir}'.format(dir=mnt))
280     except CommandFailedError:
281         remote.run(
282             args=[
283                 'mkdir',
284                 '--',
285                 mnt,
286             ],
287         )
288         log.info('Created dir {dir}'.format(dir=mnt))
289         created_mountpoint = True
290
291     if not subdir:
292         subdir = 'client.{id}'.format(id=id_)
293
294     if created_mountpoint:
295         remote.run(
296             args=[
297                 'cd',
298                 '--',
299                 mnt,
300                 run.Raw('&&'),
301                 'mkdir',
302                 '--',
303                 subdir,
304             ],
305         )
306     else:
307         remote.run(
308             args=[
309                 # cd first so this will fail if the mount point does
310                 # not exist; pure install -d will silently do the
311                 # wrong thing
312                 'cd',
313                 '--',
314                 mnt,
315                 run.Raw('&&'),
316                 'sudo',
317                 'install',
318                 '-d',
319                 '-m', '0755',
320                 '--owner={user}'.format(user=dir_owner),
321                 '--',
322                 subdir,
323             ],
324         )
325
326     return created_mountpoint
327
328
329 def _spawn_on_all_clients(ctx, refspec, tests, env, basedir, subdir, timeout=None):
330     """
331     Make a scratch directory for each client in the cluster, and then for each
332     test spawn _run_tests() for each role.
333
334     See run_tests() for parameter documentation.
335     """
336     is_client = misc.is_type('client')
337     client_remotes = {}
338     created_mountpoint = {}
339     for remote, roles_for_host in ctx.cluster.remotes.items():
340         for role in roles_for_host:
341             if is_client(role):
342                 client_remotes[role] = remote
343                 created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir)
344
345     for unit in tests:
346         with parallel() as p:
347             for role, remote in client_remotes.items():
348                 p.spawn(_run_tests, ctx, refspec, role, [unit], env,
349                         basedir,
350                         subdir,
351                         timeout=timeout)
352
353     # cleanup the generated client directories
354     for role, _ in client_remotes.items():
355         _delete_dir(ctx, role, created_mountpoint[role])
356
357
358 def _run_tests(ctx, refspec, role, tests, env, basedir,
359                subdir=None, timeout=None):
360     """
361     Run the individual test. Create a scratch directory and then extract the
362     workunits from git. Make the executables, and then run the tests.
363     Clean up (remove files created) after the tests are finished.
364
365     :param ctx:     Context
366     :param refspec: branch, sha1, or version tag used to identify this
367                     build
368     :param tests:   specific tests specified.
369     :param env:     environment set in yaml file.  Could be None.
370     :param subdir:  subdirectory set in yaml file.  Could be None
371     :param timeout: If present, use the 'timeout' command on the remote host
372                     to limit execution time. Must be specified by a number
373                     followed by 's' for seconds, 'm' for minutes, 'h' for
374                     hours, or 'd' for days. If '0' or anything that evaluates
375                     to False is passed, the 'timeout' command is not used.
376     """
377     testdir = misc.get_testdir(ctx)
378     assert isinstance(role, basestring)
379     cluster, type_, id_ = misc.split_role(role)
380     assert type_ == 'client'
381     remote = get_remote_for_role(ctx, role)
382     mnt = _client_mountpoint(ctx, cluster, id_)
383     # subdir so we can remove and recreate this a lot without sudo
384     if subdir is None:
385         scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp')
386     else:
387         scratch_tmp = os.path.join(mnt, subdir)
388     clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role)
389     srcdir = '{cdir}/{basedir}'.format(cdir=clonedir,
390                                        basedir=basedir)
391
392     git_url = teuth_config.get_ceph_qa_suite_git_url()
393     # if we are running an upgrade test, and ceph-ci does not have branches like
394     # `jewel`, so should use ceph.git as an alternative.
395     try:
396         remote.run(logger=log.getChild(role),
397                    args=refspec.clone(git_url, clonedir))
398     except CommandFailedError:
399         if git_url.endswith('/ceph-ci.git'):
400             alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git')
401         elif git_url.endswith('/ceph-ci'):
402             alt_git_url = re.sub(r'/ceph-ci$', '/ceph.git', git_url)
403         else:
404             raise
405         log.info(
406             "failed to check out '%s' from %s; will also try in %s",
407             refspec,
408             git_url,
409             alt_git_url,
410         )
411         remote.run(logger=log.getChild(role),
412                    args=refspec.clone(alt_git_url, clonedir))
413     remote.run(
414         logger=log.getChild(role),
415         args=[
416             'cd', '--', srcdir,
417             run.Raw('&&'),
418             'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi',
419             run.Raw('&&'),
420             'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir),
421             run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)),
422         ],
423     )
424
425     workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)
426     workunits = sorted(misc.get_file(remote, workunits_file).split('\0'))
427     assert workunits
428
429     try:
430         assert isinstance(tests, list)
431         for spec in tests:
432             log.info('Running workunits matching %s on %s...', spec, role)
433             prefix = '{spec}/'.format(spec=spec)
434             to_run = [w for w in workunits if w == spec or w.startswith(prefix)]
435             if not to_run:
436                 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec))
437             for workunit in to_run:
438                 log.info('Running workunit %s...', workunit)
439                 args = [
440                     'mkdir', '-p', '--', scratch_tmp,
441                     run.Raw('&&'),
442                     'cd', '--', scratch_tmp,
443                     run.Raw('&&'),
444                     run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
445                     run.Raw('CEPH_REF={ref}'.format(ref=refspec)),
446                     run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)),
447                     run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)),
448                     run.Raw('CEPH_ID="{id}"'.format(id=id_)),
449                     run.Raw('PATH=$PATH:/usr/sbin'),
450                     run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)),
451                     run.Raw('CEPH_ROOT={dir}'.format(dir=clonedir)),
452                 ]
453                 if env is not None:
454                     for var, val in env.iteritems():
455                         quoted_val = pipes.quote(val)
456                         env_arg = '{var}={val}'.format(var=var, val=quoted_val)
457                         args.append(run.Raw(env_arg))
458                 args.extend([
459                     'adjust-ulimits',
460                     'ceph-coverage',
461                     '{tdir}/archive/coverage'.format(tdir=testdir)])
462                 if timeout and timeout != '0':
463                     args.extend(['timeout', timeout])
464                 args.extend([
465                     '{srcdir}/{workunit}'.format(
466                         srcdir=srcdir,
467                         workunit=workunit,
468                     ),
469                 ])
470                 remote.run(
471                     logger=log.getChild(role),
472                     args=args,
473                     label="workunit test {workunit}".format(workunit=workunit)
474                 )
475                 remote.run(
476                     logger=log.getChild(role),
477                     args=['sudo', 'rm', '-rf', '--', scratch_tmp],
478                 )
479     finally:
480         log.info('Stopping %s on %s...', tests, role)
481         remote.run(
482             logger=log.getChild(role),
483             args=[
484                 'rm', '-rf', '--', workunits_file, clonedir,
485             ],
486         )