18f4d712bde364691093c225ec01e35db74a3694
[vswitchperf.git] / tools / tasks.py
1 # Copyright 2015-2017 Intel Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Task management helper functions and classes.
16 """
17
18 import select
19 import subprocess
20 import logging
21 import threading
22 import sys
23 import os
24 import locale
25 import time
26 import pexpect
27
28 from conf import settings
29 from tools import systeminfo
30
31
32 CMD_PREFIX = 'cmd : '
33
34 def _get_stdout():
35     """Get stdout value for ``subprocess`` calls.
36     """
37     stdout = None
38
39     if settings.getValue('VERBOSITY') != 'debug':
40         stdout = open(os.devnull, 'wb')
41
42     return stdout
43
44
45 def run_task(cmd, logger, msg=None, check_error=False):
46     """Run task, report errors and log overall status.
47
48     Run given task using ``subprocess.Popen``. Log the commands
49     used and any errors generated. Prints stdout to screen if
50     in verbose mode and returns it regardless. Prints stderr to
51     screen always.
52
53     :param cmd: Exact command to be executed
54     :param logger: Logger to write details to
55     :param msg: Message to be shown to user
56     :param check_error: Throw exception on error
57
58     :returns: (stdout, stderr)
59     """
60     def handle_error(exception):
61         """Handle errors by logging and optionally raising an exception.
62         """
63         logger.error(
64             'Unable to execute %(cmd)s. Exception: %(exception)s',
65             {'cmd': ' '.join(cmd), 'exception': exception})
66         if check_error:
67             raise exception
68
69     stdout = []
70     stderr = []
71     my_encoding = locale.getdefaultlocale()[1]
72
73     if msg:
74         logger.info(msg)
75
76     # pylint: disable=too-many-nested-blocks
77     logger.debug('%s%s', CMD_PREFIX, ' '.join(cmd))
78     try:
79         proc = subprocess.Popen(map(os.path.expanduser, cmd),
80                                 stdout=subprocess.PIPE,
81                                 stderr=subprocess.PIPE, bufsize=0)
82
83         while True:
84             reads = [proc.stdout.fileno(), proc.stderr.fileno()]
85             ret = select.select(reads, [], [])
86
87             for file_d in ret[0]:
88                 if file_d == proc.stdout.fileno():
89                     while True:
90                         line = proc.stdout.readline()
91                         if not line:
92                             break
93                         if settings.getValue('VERBOSITY') == 'debug':
94                             sys.stdout.write(line.decode(my_encoding))
95                         stdout.append(line)
96                 if file_d == proc.stderr.fileno():
97                     while True:
98                         line = proc.stderr.readline()
99                         if not line:
100                             break
101                         sys.stderr.write(line.decode(my_encoding))
102                         stderr.append(line)
103
104             if proc.poll() is not None:
105                 break
106
107     except OSError as ex:
108         handle_error(ex)
109     else:
110         if proc.returncode:
111             ex = subprocess.CalledProcessError(proc.returncode, cmd, stderr)
112             handle_error(ex)
113
114     return ('\n'.join(sout.decode(my_encoding).strip() for sout in stdout),
115             ('\n'.join(sout.decode(my_encoding).strip() for sout in stderr)))
116
117 def update_pids(pid):
118     """update list of running pids, so they can be terminated at the end
119     """
120     try:
121         pids = settings.getValue('_EXECUTED_PIDS')
122         pids.append(pid)
123     except AttributeError:
124         pids = [pid]
125     settings.setValue('_EXECUTED_PIDS', pids)
126
127 def run_background_task(cmd, logger, msg):
128     """Run task in background and log when started.
129
130     Run given task using ``subprocess.Popen``. Log the command
131     used. Print stdout to screen if in verbose mode. Prints stderr
132     to screen always.
133
134     :param cmd: Exact command to be executed
135     :param logger: Logger to write details to
136     :param msg: Message to be shown to user
137
138     :returns: Process PID
139     """
140     logger.info(msg)
141     logger.debug('%s%s', CMD_PREFIX, ' '.join(cmd))
142
143     proc = subprocess.Popen(map(os.path.expanduser, cmd), stdout=_get_stdout(), bufsize=0)
144
145     update_pids(proc.pid)
146
147     return proc.pid
148
149
150 def run_interactive_task(cmd, logger, msg):
151     """Run a task interactively and log when started.
152
153     Run given task using ``pexpect.spawn``. Log the command used.
154     Performs neither validation of the process - if the process
155     successfully started or is still running - nor killing of the
156     process. The user must do both.
157
158     :param cmd: Exact command to be executed
159     :param logger: Logger to write details to
160     :param msg: Message to be shown to user
161
162     :returns: ``pexpect.child`` object
163     """
164     logger.info(msg)
165     logger.debug('%s%s', CMD_PREFIX, cmd)
166     child = pexpect.spawnu(cmd)
167
168     if settings.getValue('VERBOSITY') == 'debug':
169         child.logfile_read = sys.stdout
170
171     return child
172
173 def terminate_task_subtree(pid, signal='-15', sleep=10, logger=None):
174     """Terminate given process and all its children
175
176     Function will sent given signal to the process. In case
177     that process will not terminate within given sleep interval
178     and signal was not SIGKILL, then process will be killed by SIGKILL.
179     After that function will check if all children of the process
180     are terminated and if not the same terminating procedure is applied
181     on any living child (only one level of children is considered).
182
183     :param pid: Process ID to terminate
184     :param signal: Signal to be sent to the process
185     :param sleep: Maximum delay in seconds after signal is sent
186     :param logger: Logger to write details to
187     """
188     try:
189         children = subprocess.check_output("pgrep -P " + str(pid), shell=True).decode().rstrip('\n').split()
190     except subprocess.CalledProcessError:
191         children = []
192
193     terminate_task(pid, signal, sleep, logger)
194
195     # just for case children were kept alive
196     for child in children:
197         terminate_task(child, signal, sleep, logger)
198
199 def terminate_task(pid, signal='-15', sleep=10, logger=None):
200     """Terminate process with given pid
201
202     Function will sent given signal to the process. In case
203     that process will not terminate within given sleep interval
204     and signal was not SIGKILL, then process will be killed by SIGKILL.
205
206     :param pid: Process ID to terminate
207     :param signal: Signal to be sent to the process
208     :param sleep: Maximum delay in seconds after signal is sent
209     :param logger: Logger to write details to
210     """
211     if systeminfo.pid_isalive(pid):
212         run_task(['sudo', 'kill', signal, str(pid)], logger)
213         logger.debug('Wait for process %s to terminate after signal %s', pid, signal)
214         for dummy in range(sleep):
215             time.sleep(1)
216             if not systeminfo.pid_isalive(pid):
217                 break
218
219         if signal.lstrip('-').upper() not in ('9', 'KILL', 'SIGKILL') and systeminfo.pid_isalive(pid):
220             terminate_task(pid, '-9', sleep, logger)
221
222     pids = settings.getValue('_EXECUTED_PIDS')
223     if pid in pids:
224         pids.remove(pid)
225         settings.setValue('_EXECUTED_PIDS', pids)
226
227 def terminate_all_tasks(logger):
228     """Terminate all processes executed by vsperf, just for case they were not
229     terminated by standard means.
230     """
231     pids = settings.getValue('_EXECUTED_PIDS')
232     if pids:
233         logger.debug('Following processes will be terminated: %s', pids)
234         for pid in pids:
235             terminate_task_subtree(pid, logger=logger)
236         settings.setValue('_EXECUTED_PIDS', [])
237
238 class Process(object):
239     """Control an instance of a long-running process.
240
241     This is basically a context-manager wrapper around the
242     ``run_interactive_task`` function above (with some extra helper
243     functions).
244     """
245     _cmd = None
246     _child = None
247     _logfile = None
248     _logger = logging.getLogger(__name__)
249     _expect = None
250     _timeout = -1
251     _proc_name = 'unnamed process'
252     _relinquish_thread = None
253
254     # context manager
255
256     def __enter__(self):
257         """Start process instance using context manager.
258         """
259         self.start()
260         return self
261
262     def __exit__(self, type_, value, traceback):
263         """Shutdown process instance.
264         """
265         self.kill()
266
267     # startup/shutdown
268
269     def start(self):
270         """Start process instance.
271         """
272         self._start_process()
273         if self._timeout > 0:
274             self._expect_process()
275
276     def _start_process(self):
277         """Start process instance.
278         """
279         cmd = ' '.join(settings.getValue('SHELL_CMD') +
280                        ['"%s"' % ' '.join(self._cmd)])
281
282         self._child = run_interactive_task(cmd, self._logger,
283                                            'Starting %s...' % self._proc_name)
284         self._child.logfile = open(self._logfile, 'w')
285
286     def expect(self, msg, timeout=None):
287         """Expect string from process.
288
289         Expect string and die if not received.
290
291         :param msg: String to expect.
292         :param timeout: Time to wait for string.
293
294         :returns: None
295         """
296         self._expect_process(msg, timeout)
297
298     def _expect_process(self, msg=None, timeout=None):
299         """Expect string from process.
300         """
301         if not msg:
302             msg = self._expect
303         if not timeout:
304             timeout = self._timeout
305
306         # we use exceptions rather than catching conditions in ``expect`` list
307         # as we want to fail catastrophically after handling; there is likely
308         # little we can do from within the scripts to fix issues such as
309         # hugepages not being mounted
310         try:
311             self._child.expect([msg], timeout=timeout)
312         except pexpect.EOF as exc:
313             self._logger.critical(
314                 'An error occurred. Please check the logs (%s) for more'
315                 ' information. Exiting...', self._logfile)
316             raise exc
317         except pexpect.TIMEOUT as exc:
318             self._logger.critical(
319                 'Failed to execute in \'%d\' seconds. Please check the logs'
320                 ' (%s) for more information. Exiting...',
321                 timeout, self._logfile)
322             self.kill()
323             raise exc
324         except (Exception, KeyboardInterrupt) as exc:
325             self._logger.critical('General exception raised. Exiting...')
326             self.kill()
327             raise exc
328
329     def kill(self, signal='-15', sleep=10):
330         """Kill process instance if it is alive.
331
332         :param signal: signal to be sent to the process
333         :param sleep: delay in seconds after signal is sent
334         """
335         if self.is_running():
336             terminate_task_subtree(self._child.pid, signal, sleep, self._logger)
337
338             if self.is_relinquished():
339                 self._relinquish_thread.join()
340
341         self._logger.info(
342             'Log available at %s', self._logfile)
343
344     def is_relinquished(self):
345         """Returns True if process is relinquished.
346
347         If relinquished the process is no longer controllable and can
348         only be killed.
349
350         :returns: True if process is relinquished, else False.
351         """
352         return self._relinquish_thread
353
354     def is_running(self):
355         """Returns True if process is running.
356
357         :returns: True if process is running, else False
358         """
359         return self._child and self._child.isalive()
360
361     def _affinitize_pid(self, core, pid):
362         """Affinitize a process with ``pid`` to ``core``.
363
364         :param core: Core to affinitize process to.
365         :param pid: Process ID to affinitize.
366
367         :returns: None
368         """
369         run_task(['sudo', 'taskset', '-c', '-p', str(core),
370                   str(pid)],
371                  self._logger)
372
373     def affinitize(self, core):
374         """Affinitize process to a specific ``core``.
375
376         :param core: Core to affinitize process to.
377
378         :returns: None
379         """
380         self._logger.info('Affinitizing process')
381
382         if self.is_running():
383             self._affinitize_pid(core, self._child.pid)
384
385     class ContinueReadPrintLoop(threading.Thread):
386         """Thread to read output from child and log.
387
388         Taken from: https://github.com/pexpect/pexpect/issues/90
389         """
390         def __init__(self, child):
391             self.child = child
392             threading.Thread.__init__(self)
393
394         def run(self):
395             while True:
396                 try:
397                     self.child.read_nonblocking()
398                 except (pexpect.EOF, pexpect.TIMEOUT):
399                     break
400
401     def relinquish(self):
402         """Relinquish control of process.
403
404         Give up control of application in order to ensure logging
405         continues for the application. After relinquishing control it
406         will no longer be possible to :func:`expect` anything.
407
408         This works around an issue described here:
409
410             https://github.com/pexpect/pexpect/issues/90
411
412         It is hoped that future versions of pexpect will avoid this
413         issue.
414         """
415         self._relinquish_thread = self.ContinueReadPrintLoop(self._child)
416         self._relinquish_thread.start()
417
418
419 class CustomProcess(Process):
420     """An sample implementation of ``Process``.
421
422     This is essentially a more detailed version of the
423     ``run_interactive_task`` function that checks for process execution
424     and kills the process (assuming use of the context manager).
425     """
426     def __init__(self, cmd, timeout, logfile, expect, name):
427         """Initialise process state.
428
429         :param cmd: Command to execute.
430         :param timeout: Time to wait for ``expect``.
431         :param logfile: Path to logfile.
432         :param expect: String to expect indicating startup. This is a
433             regex and should be escaped as such.
434         :param name: Name of process to use in logs.
435
436         :returns: None
437         """
438         self._cmd = cmd
439         self._logfile = logfile
440         self._expect = expect
441         self._proc_name = name
442         self._timeout = timeout