| #!/usr/bin/python |
| # |
| # |
| # Copyright 2014, NICTA |
| # |
| # This software may be distributed and modified according to the terms of |
| # the BSD 2-Clause license. Note that NO WARRANTY is provided. |
| # See "LICENSE_BSD2.txt" for details. |
| # |
| # @TAG(NICTA_BSD) |
| # |
| # |
| # Very simple command-line test runner. |
| # |
| # Ignores timeouts. |
| # |
| |
| from __future__ import print_function |
| |
| import argparse |
| import atexit |
| import datetime |
| import fnmatch |
| import memusage |
| import os |
| import signal |
| import subprocess |
| import sys |
| import testspec |
| import time |
| import traceback |
| |
| # Try importing psutil. |
| PS_UTIL_AVAILABLE = False |
| try: |
| import psutil |
| from psutil import NoSuchProcess |
| PS_UTIL_AVAILABLE = True |
| if not hasattr(psutil.Process, "children") and hasattr(psutil.Process, "get_children"): |
| psutil.Process.children = psutil.Process.get_children |
| except ImportError: |
| pass |
| |
| ANSI_RESET = "\033[0m" |
| ANSI_RED = "\033[31;1m" |
| ANSI_GREEN = "\033[32m" |
| ANSI_YELLOW = "\033[33m" |
| ANSI_WHITE = "\033[38m" |
| ANSI_BOLD = "\033[1m" |
| |
| def output_color(color, s): |
| """Wrap the given string in the given color.""" |
| if os.isatty(sys.stdout.fileno()): |
| return color + s + ANSI_RESET |
| return s |
| |
| # Find a command in the PATH. |
| def which(file): |
| for path in os.environ["PATH"].split(os.pathsep): |
| candidate = os.path.join(path, file) |
| if os.path.exists(candidate) and os.access(candidate, os.X_OK): |
| return candidate |
| return None |
| |
| # |
| # Kill a process and all of its children. |
| # |
| # We attempt to handle races where a PID goes away while we |
| # are looking at it, but not where a PID has been reused. |
| # |
| def kill_family(parent_pid): |
| if not PS_UTIL_AVAILABLE: |
| return |
| |
| # Find process. |
| try: |
| process = psutil.Process(parent_pid) |
| except NoSuchProcess: |
| # Race. Nothing more to do. |
| return |
| |
| # SIGSTOP everyone first. |
| try: |
| process.suspend() |
| except NoSuchProcess: |
| # Race. Nothing more to do. |
| return |
| process_list = [process] |
| for child in process.children(recursive=True): |
| try: |
| child.suspend() |
| except NoSuchProcess: |
| # Race. |
| pass |
| else: |
| process_list.append(child) |
| |
| |
| # Now SIGKILL everyone. |
| process_list.reverse() |
| for p in process_list: |
| p.send_signal(signal.SIGKILL) |
| |
| # |
| # Run a single test. |
| # |
| # Return a tuple of (success, log, time_taken, memory_usage). |
| # |
| # Log only contains the output if verbose is *false*; otherwise, the |
| # log is output to stdout where we can't easily get to it. |
| # |
| def run_test(test, verbose=False): |
| # Construct the base command. |
| command = ["bash", "-c", test.command] |
| |
| # If we have a "pidspace" program, use that to ensure that programs |
| # that double-fork can't continue running after the parent command |
| # dies. |
| if which("pidspace") != None: |
| command = [which("pidspace"), "--"] + command |
| |
| # Print command and path. |
| if verbose: |
| print("\n") |
| if os.path.abspath(test.cwd) != os.path.abspath(os.getcwd()): |
| path = " [%s]" % os.path.relpath(test.cwd) |
| else: |
| path = "" |
| print(" command: %s%s" % (test.command, path)) |
| |
| output = subprocess.PIPE |
| |
| # Start timing. |
| start_time = datetime.datetime.now() |
| |
| # Start the command. |
| peak_mem_usage = None |
| try: |
| process = subprocess.Popen(command, |
| stdout=output, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, |
| cwd=test.cwd) |
| lines_iter = iter(process.stdout.readline, b"") |
| if verbose: |
| for line in lines_iter: |
| print(line, end='') |
| except: |
| output = "Exception while running test:\n\n%s" % (traceback.format_exc()) |
| atexit.register(lambda: kill_family(process.pid)) |
| if verbose: |
| print(output) |
| return (False, "ERROR", output, datetime.datetime.now() - start_time, peak_mem_usage) |
| |
| # If our program exits for some reason, attempt to kill the process. |
| atexit.register(lambda: kill_family(process.pid)) |
| |
| # Setup an alarm at the timeout. |
| was_timeout = [False] |
| def alarm_handler(sig, _): |
| was_timeout[0] = True |
| kill_family(process.pid) |
| signal.signal(signal.SIGALRM, alarm_handler) |
| signal.alarm(test.timeout) |
| |
| # Wait for the command to finish. |
| with memusage.process_poller(process.pid) as m: |
| (output, _) = process.communicate() |
| peak_mem_usage = m.peak_mem_usage() |
| |
| # Cancel the alarm. Small race here (if the alarm fires just after the |
| # process finished), but the returncode of our process should still be 0, |
| # and hence we won't interpret the result as a timeout. |
| signal.alarm(0) |
| |
| if output == None: |
| output = "" |
| if process.returncode == 0: |
| status = "pass" |
| elif was_timeout[0]: |
| status = "TIMEOUT" |
| else: |
| status = "FAILED" |
| return (process.returncode == 0, status, output, datetime.datetime.now() - start_time, peak_mem_usage) |
| |
| def print_line(ch='-'): |
| print("".join([ch for x in range(72)])) |
| |
| # Print a status line. |
| def print_test_line_start(test_name, verbose=False): |
| print(" running %-25s " % (test_name + " ..."), end="") |
| sys.stdout.flush() |
| |
| def print_test_line_end(test_name, color, status, time_taken, mem, verbose=False): |
| if mem: |
| # Report meory usage in gigabytes. |
| mem = '%5.2fGB' % round(float(mem) / 1024 / 1024 / 1024, 2) |
| if time_taken: |
| # Strip milliseconds for better printing. |
| time_taken = datetime.timedelta(seconds=int(time_taken.total_seconds())) |
| time_taken = '%8s' % time_taken |
| extras = ', '.join(filter(None, [time_taken, mem])) |
| |
| # Print status line. |
| print(output_color(color, "%-10s" % status) + ('(%s)' % extras if extras else '')) |
| if verbose: |
| print("") |
| print_line("=") |
| print() |
| sys.stdout.flush() |
| |
| def print_test_line(test_name, color, status, time_taken, mem, verbose=False): |
| print_test_line_start(test_name, verbose) |
| print_test_line_end(test_name, color, status, time_taken, mem, verbose) |
| |
| # |
| # Recursive glob |
| # |
| def rglob(base_dir, pattern): |
| matches = [] |
| for root, dirnames, filenames in os.walk(base_dir): |
| for filename in fnmatch.filter(filenames, pattern): |
| matches.append(os.path.join(root, filename)) |
| return matches |
| |
| # |
| # Exclusive rglob |
| # |
| def xrglob(base_dir, pattern, excl_dirs): |
| matches = [] |
| if base_dir in excl_dirs: |
| return matches |
| for root, dirnames, filenames in os.walk(base_dir): |
| for excl_dir in excl_dirs: |
| if root.startswith(os.path.abspath(excl_dir)): |
| break |
| else: |
| for filename in fnmatch.filter(filenames, pattern): |
| matches.append(os.path.join(root, filename)) |
| return matches |
| |
| |
| # |
| # Run tests. |
| # |
| def main(): |
| # Parse arguments |
| parser = argparse.ArgumentParser(description="Simple Regression Framework") |
| parser.add_argument("-s", "--strict", action="store_true", |
| help="be strict when parsing test XML files") |
| parser.add_argument("-d", "--directory", action="store", |
| metavar="DIR", help="directory to search for test files", |
| default=os.getcwd()) |
| parser.add_argument("-x", "--exclude", action="append", |
| metavar="DIR", help="directory to exclude while searching for test files (multiple -x can be given)", |
| default=[]) |
| parser.add_argument("--brief", action="store_true", |
| help="don't print failure logs at end of test run") |
| parser.add_argument("-l", "--list", action="store_true", |
| help="list known tests") |
| parser.add_argument("--legacy", action="store_true", |
| help="use legacy 'IsaMakefile' specs") |
| parser.add_argument("-v", "--verbose", action="store_true", |
| help="print test output") |
| parser.add_argument("--limit", action="store", |
| help="set line limit for logs", default=40) |
| parser.add_argument("tests", metavar="TESTS", |
| help="tests to run (defaults to all tests)", |
| nargs="*") |
| args = parser.parse_args() |
| |
| # Search for test files: |
| if not args.legacy: |
| test_xml = sorted(xrglob(args.directory, "tests.xml", args.exclude)) |
| tests = testspec.parse_test_files(test_xml, strict=args.strict) |
| else: |
| # Fetch legacy tests. |
| tests = testspec.legacy_testspec(args.directory) |
| |
| # List test names if requested. |
| if args.list: |
| for t in tests: |
| print(t.name) |
| sys.exit(0) |
| |
| # Calculate which tests should be run. |
| tests_to_run = [] |
| if len(args.tests) == 0: |
| tests_to_run = tests |
| else: |
| desired_names = set(args.tests) |
| bad_names = desired_names - set([t.name for t in tests]) |
| if len(bad_names) > 0: |
| parser.error("Unknown test names: %s" % (", ".join(sorted(bad_names)))) |
| tests_to_run = [t for t in tests if t.name in desired_names] |
| |
| # If running at least one test, and psutil is not available, print a warning. |
| if len(tests_to_run) > 0 and not PS_UTIL_AVAILABLE: |
| print("\n" |
| "Warning: 'psutil' module not available. Processes may not be correctly\n" |
| "stopped. Run\n" |
| "\n" |
| " pip install --user psutil\n" |
| "\n" |
| "to install.\n" |
| "\n") |
| |
| # Run the tests. |
| print("Running %d test(s)...\n" % len(tests_to_run)) |
| failed_tests = set() |
| failed_test_log = [] |
| for t in tests_to_run: |
| if len(t.depends & failed_tests) > 0: |
| print_test_line(t.name, ANSI_YELLOW, "skipped", None, None) |
| failed_tests.add(t.name) |
| continue |
| |
| # Run the test. |
| print_test_line_start(t.name, verbose=args.verbose) |
| (passed, status, log, time_taken, mem) = run_test(t, verbose=args.verbose) |
| |
| # Print result. |
| if not passed: |
| failed_tests.add(t.name) |
| failed_test_log.append((t.name, log, time_taken)) |
| print_test_line_end(t.name, ANSI_RED, "%s *" % status, time_taken, mem, verbose=args.verbose) |
| else: |
| print_test_line_end(t.name, ANSI_GREEN, status, time_taken, mem, verbose=args.verbose) |
| |
| # Print failure summaries unless requested not to. |
| if not args.brief and len(failed_test_log) > 0: |
| print("") |
| log_limit = int(args.limit) |
| for (failed_test, log, _) in failed_test_log: |
| print_line() |
| print("TEST FAILURE: %s" % failed_test) |
| print("") |
| log = log.rstrip("\n") + "\n" |
| lines = log.split("\n") |
| if len(lines) > 2 * log_limit: |
| lines = lines[:log_limit] + ["..."] + lines[-log_limit:] |
| print("\n".join(lines)) |
| print_line() |
| |
| # Print summary. |
| print(("\n\n" |
| + output_color(ANSI_WHITE, "%d/%d tests succeeded.") + "\n") |
| % (len(tests_to_run) - len(failed_tests), len(tests_to_run))) |
| if len(failed_tests) > 0: |
| print(output_color(ANSI_RED, "Tests failed.") + "\n") |
| sys.exit(1) |
| else: |
| print(output_color(ANSI_GREEN, "All tests passed.") + "\n") |
| sys.exit(0) |
| |
| |
| if __name__ == "__main__": |
| main() |
| |
| |