3 # Tool for running fuzz tests
5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 from itertools import count
37 import simplejson as json
40 "Warning: Module for JSON processing is not found.\n" \
41 "'--config' and '--command' options are not supported."
43 # Backing file sizes in MB
44 MAX_BACKING_FILE_SIZE = 10
45 MIN_BACKING_FILE_SIZE = 1
48 def multilog(msg, *output):
49 """ Write an object to all of specified file descriptors."""
56 """ Convert a numeric value of a system signal to the string one
57 defined by the current operational system.
59 for k, v in signal.__dict__.items():
64 def run_app(fd, q_args):
65 """Start an application with specified arguments and return its exit code
66 or kill signal depending on the result of execution.
69 class Alarm(Exception):
70 """Exception for signal.alarm events."""
74 """Notify that an alarm event occurred."""
77 signal.signal(signal.SIGALRM, handler)
79 term_signal = signal.SIGKILL
80 devnull = open('/dev/null', 'r+')
81 process = subprocess.Popen(q_args, stdin=devnull,
82 stdout=subprocess.PIPE,
83 stderr=subprocess.PIPE)
85 out, err = process.communicate()
90 return process.returncode
93 os.kill(process.pid, term_signal)
94 fd.write('The command was terminated by timeout.\n')
99 class TestException(Exception):
100 """Exception for errors risen by TestEnv objects."""
104 class TestEnv(object):
108 The class sets up test environment, generates backing and test images
109 and executes application under tests with specified arguments and a test
112 All logs are collected.
114 The summary log will contain short descriptions and statuses of tests in
117 The test log will include application (e.g. 'qemu-img') logs besides info
118 sent to the summary log.
121 def __init__(self, test_id, seed, work_dir, run_log,
122 cleanup=True, log_all=False):
123 """Set test environment in a specified work directory.
125 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
126 'QEMU_IO' environment variables.
131 self.seed = str(random.randint(0, sys.maxint))
132 random.seed(self.seed)
134 self.init_path = os.getcwd()
135 self.work_dir = work_dir
136 self.current_dir = os.path.join(work_dir, 'test-' + test_id)
138 os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
139 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
140 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
141 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
142 ['qemu-io', '$test_img', '-c', 'read $off $len'],
143 ['qemu-io', '$test_img', '-c', 'write $off $len'],
144 ['qemu-io', '$test_img', '-c',
145 'aio_read $off $len'],
146 ['qemu-io', '$test_img', '-c',
147 'aio_write $off $len'],
148 ['qemu-io', '$test_img', '-c', 'flush'],
149 ['qemu-io', '$test_img', '-c',
150 'discard $off $len'],
151 ['qemu-io', '$test_img', '-c',
153 for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
154 self.commands.append(
155 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
156 '$test_img', 'converted_image.' + fmt])
159 os.makedirs(self.current_dir)
161 print >>sys.stderr, \
162 "Error: The working directory '%s' cannot be used. Reason: %s"\
163 % (self.work_dir, e[1])
165 self.log = open(os.path.join(self.current_dir, "test.log"), "w")
166 self.parent_log = open(run_log, "a")
168 self.cleanup = cleanup
169 self.log_all = log_all
171 def _create_backing_file(self):
172 """Create a backing file in the current directory.
174 Return a tuple of a backing file name and format.
176 Format of a backing file is randomly chosen from all formats supported
177 by 'qemu-img create'.
179 # All formats supported by the 'qemu-img create' command.
180 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
181 'file', 'qed', 'vpc'])
182 backing_file_name = 'backing_img.' + backing_file_fmt
183 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
184 MAX_BACKING_FILE_SIZE) * (1 << 20)
185 cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
186 backing_file_name, str(backing_file_size)]
187 temp_log = StringIO.StringIO()
188 retcode = run_app(temp_log, cmd)
191 return (backing_file_name, backing_file_fmt)
193 multilog("Warning: The %s backing file was not created.\n\n"
194 % backing_file_fmt, sys.stderr, self.log, self.parent_log)
195 self.log.write("Log for the failure:\n" + temp_log.getvalue() +
200 def execute(self, input_commands=None, fuzz_config=None):
203 The method creates backing and test images, runs test app and analyzes
204 its exit status. If the application was killed by a signal, the test
207 if input_commands is None:
208 commands = self.commands
210 commands = input_commands
212 os.chdir(self.current_dir)
213 backing_file_name, backing_file_fmt = self._create_backing_file()
214 img_size = image_generator.create_image(
215 'test.img', backing_file_name, backing_file_fmt, fuzz_config)
216 for item in commands:
217 shutil.copy('test.img', 'copy.img')
218 # 'off' and 'len' are multiple of the sector size
220 start = random.randrange(0, img_size + 1, sector_size)
221 end = random.randrange(start, img_size + 1, sector_size)
223 if item[0] == 'qemu-img':
224 current_cmd = list(self.qemu_img)
225 elif item[0] == 'qemu-io':
226 current_cmd = list(self.qemu_io)
228 multilog("Warning: test command '%s' is not defined.\n"
229 % item[0], sys.stderr, self.log, self.parent_log)
231 # Replace all placeholders with their real values
234 .replace('$test_img', 'copy.img')
235 .replace('$off', str(start))
236 .replace('$len', str(end - start)))
237 current_cmd.append(c)
239 # Log string with the test header
240 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
241 "Backing file: %s\n" \
242 % (self.seed, " ".join(current_cmd),
243 self.current_dir, backing_file_name)
244 temp_log = StringIO.StringIO()
246 retcode = run_app(temp_log, current_cmd)
248 multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
249 % (test_summary, os.path.basename(current_cmd[0]),
251 sys.stderr, self.log, self.parent_log)
255 self.log.write(temp_log.getvalue())
256 multilog("%sFAIL: Test terminated by signal %s\n\n"
257 % (test_summary, str_signal(-retcode)),
258 sys.stderr, self.log, self.parent_log)
262 self.log.write(temp_log.getvalue())
263 multilog("%sPASS: Application exited with the code " \
264 "'%d'\n\n" % (test_summary, retcode),
265 sys.stdout, self.log, self.parent_log)
267 os.remove('copy.img')
270 """Restore the test environment after a test execution."""
272 self.parent_log.close()
273 os.chdir(self.init_path)
274 if self.cleanup and not self.failed:
275 shutil.rmtree(self.current_dir)
277 if __name__ == '__main__':
281 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
283 Set up test environment in TEST_DIR and run a test in it. A module for
284 test image generation should be specified via IMG_GENERATOR.
287 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
290 -h, --help display this help and exit
291 -d, --duration=NUMBER finish tests after NUMBER of seconds
292 -c, --command=JSON run tests for all commands specified in
294 -s, --seed=STRING seed for a test image generation,
295 by default will be generated randomly
296 --config=JSON take fuzzer configuration from the JSON
298 -k, --keep_passed don't remove folders of passed tests
299 -v, --verbose log information about passed tests
303 '--command' accepts a JSON array of commands. Each command presents
304 an application under test with all its parameters as a list of strings,
305 e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
307 Supported application aliases: 'qemu-img' and 'qemu-io'.
309 Supported argument aliases: $test_img for the fuzzed image, $off
310 for an offset, $len for length.
312 Values for $off and $len will be generated based on the virtual disk
313 size of the fuzzed image.
315 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
316 'QEMU_IO' environment variables.
318 '--config' accepts a JSON array of fields to be fuzzed, e.g.
319 '[["header"], ["header", "version"]]'.
321 Each of the list elements can consist of a complex image element only
322 as ["header"] or ["feature_name_table"] or an exact field as
323 ["header", "version"]. In the first case random portion of the element
324 fields will be fuzzed, in the second one the specified field will be
327 If '--config' argument is specified, fields not listed in
328 the configuration array will not be fuzzed.
331 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
332 command, fuzz_config):
333 """Setup environment for one test and execute this test."""
335 test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
337 except TestException:
340 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
344 test.execute(command, fuzz_config)
345 except TestException:
350 def should_continue(duration, start_time):
351 """Return True if a new test can be started and False otherwise."""
352 current_time = int(time.time())
353 return (duration is None) or (current_time - start_time < duration)
356 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
357 ['command=', 'help', 'seed=', 'config=',
358 'keep_passed', 'verbose', 'duration='])
359 except getopt.error as e:
360 print >>sys.stderr, \
361 "Error: %s\n\nTry 'runner.py --help' for more information" % e
370 for opt, arg in opts:
371 if opt in ('-h', '--help'):
374 elif opt in ('-c', '--command'):
376 command = json.loads(arg)
377 except (TypeError, ValueError, NameError) as e:
378 print >>sys.stderr, \
379 "Error: JSON array of test commands cannot be loaded.\n" \
382 elif opt in ('-k', '--keep_passed'):
384 elif opt in ('-v', '--verbose'):
386 elif opt in ('-s', '--seed'):
388 elif opt in ('-d', '--duration'):
390 elif opt == '--config':
392 config = json.loads(arg)
393 except (TypeError, ValueError, NameError) as e:
394 print >>sys.stderr, \
395 "Error: JSON array with the fuzzer configuration cannot" \
396 " be loaded\nReason: %s" % e
399 if not len(args) == 2:
400 print >>sys.stderr, \
401 "Expected two parameters\nTry 'runner.py --help'" \
402 " for more information."
405 work_dir = os.path.realpath(args[0])
406 # run_log is created in 'main', because multiple tests are expected to
408 run_log = os.path.join(work_dir, 'run.log')
410 # Add the path to the image generator module to sys.path
411 sys.path.append(os.path.realpath(os.path.dirname(args[1])))
412 # Remove a script extension from image generator module if any
413 generator_name = os.path.splitext(os.path.basename(args[1]))[0]
416 image_generator = __import__(generator_name)
417 except ImportError as e:
418 print >>sys.stderr, \
419 "Error: The image generator '%s' cannot be imported.\n" \
420 "Reason: %s" % (generator_name, e)
424 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
425 # If a seed is specified, only one test will be executed.
426 # Otherwise runner will terminate after a keyboard interruption
427 start_time = int(time.time())
429 while should_continue(duration, start_time):
431 run_test(str(test_id.next()), seed, work_dir, run_log, cleanup,
432 log_all, command, config)
433 except (KeyboardInterrupt, SystemExit):