blob: d93dd63e5281bc8e3372ff1cc3c45b72493296cb [file] [log] [blame]
# pylint: disable=C0301,C0103,C0111
from __future__ import print_function
from sys import platform
import os
import sys
import socket
import fnmatch
import subprocess
import psutil
import shutil
import tempfile
import uuid
import re
from collections import OrderedDict, defaultdict
from time import monotonic, sleep
from typing import List, Dict, Tuple, Set
from argparse import Namespace
import robot
import xml.etree.ElementTree as ET
from tests_engine import TestResult
this_path = os.path.abspath(os.path.dirname(__file__))
def install_cli_arguments(parser):
group = parser.add_mutually_exclusive_group()
group.add_argument("--robot-framework-remote-server-full-directory",
dest="remote_server_full_directory",
action="store",
help="Full location of robot framework remote server binary.")
group.add_argument("--robot-framework-remote-server-directory-prefix",
dest="remote_server_directory_prefix",
action="store",
default=os.path.join(this_path, '../output/bin'),
help="Directory of robot framework remote server binary. This is concatenated with current configuration to create full path.")
parser.add_argument("--robot-framework-remote-server-name",
dest="remote_server_name",
action="store",
default="Renode.exe",
help="Name of robot framework remote server binary.")
parser.add_argument("--robot-framework-remote-server-port", "-P",
dest="remote_server_port",
action="store",
default=0,
type=int,
help="Port of robot framework remote server binary. Use '0' to automatically select any unused private port.")
parser.add_argument("--enable-xwt",
dest="enable_xwt",
action="store_true",
default=False,
help="Enables support for XWT.")
parser.add_argument("--show-log",
dest="show_log",
action="store_true",
default=False,
help="Display log messages in console (might corrupt robot summary output).")
parser.add_argument("--keep-renode-output",
dest="keep_renode_output",
action="store_true",
default=False,
help=" ".join([
"Redirect Renode stdout and stderr to log files.",
"Only non-empty log files are kept (i.e. up to 2 per suite).",
"This is separate from the usual logs generated by RobotFramework.",
"Implies --show-log (output is redirected and does not appear in console).",
]))
parser.add_argument("--verbose",
dest="verbose",
action="store_true",
default=False,
help="Print verbose info from Robot Framework.")
parser.add_argument("--hot-spot",
dest="hotspot",
action="store",
default=None,
help="Test given hot spot action.")
parser.add_argument("--variable",
dest="variables",
action="append",
default=None,
help="Variable to pass to Robot.")
parser.add_argument("--css-file",
dest="css_file",
action="store",
default=os.path.join(this_path, '../lib/resources/styles/robot.css'),
help="Custom CSS style for the result files.")
parser.add_argument("--runner",
dest="runner",
action="store",
default="mono" if platform.startswith("linux") or platform == "darwin" else "none",
help=".NET runner.")
parser.add_argument("--net",
dest="runner",
action="store_const",
const="dotnet",
help="Use .NET Core runner (alias for --runner=dotnet).")
parser.add_argument("--debug-on-error",
dest="debug_on_error",
action="store_true",
default=False,
help="Enables the Renode User Interface when test fails.")
parser.add_argument("--cleanup-timeout",
dest="cleanup_timeout",
action="store",
default=3,
type=int,
help="Robot frontend process cleanup timeout.")
parser.add_argument("--listener",
action="append",
help="Path to additional progress listener (can be provided many times).")
parser.add_argument("--renode-config",
dest="renode_config",
action="store",
default=None,
help="Path to the Renode config file.")
parser.add_argument("--kill-stale-renode-instances",
dest="autokill_renode",
action="store_true",
default=False,
help="Automatically kill stale Renode instances without asking.")
parser.add_argument("--gather-execution-metrics",
dest="execution_metrics",
action="store_true",
default=False,
help="Gather execution metrics for each suite.")
def verify_cli_arguments(options):
# port is not available on Windows
if platform != "win32":
if options.port == str(options.remote_server_port):
print('Port {} is reserved for Robot Framework remote server and cannot be used for remote debugging.'.format(options.remote_server_port))
sys.exit(1)
if options.port is not None and options.jobs != 1:
print("Debug port cannot be used in parallel runs")
sys.exit(1)
if options.css_file:
if not os.path.isabs(options.css_file):
options.css_file = os.path.join(this_path, options.css_file)
if not os.path.isfile(options.css_file):
print("Unable to find provided CSS file: {0}.".format(options.css_file))
sys.exit(1)
if options.remote_server_port != 0 and options.jobs != 1:
print("Parallel execution and fixed Robot port number options cannot be used together")
sys.exit(1)
def is_process_running(pid):
if not psutil.pid_exists(pid):
return False
proc = psutil.Process(pid)
# docs note: is_running() will return True also if the process is a zombie (p.status() == psutil.STATUS_ZOMBIE)
return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
def is_port_available(port, autokill):
port_handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
available = False
try:
port_handle.bind(("localhost", port))
port_handle.close()
available = True
except:
available = can_be_freed_by_killing_other_job(port, autokill)
return available
def can_be_freed_by_killing_other_job(port, autokill):
if not sys.stdin.isatty():
return
try:
for proc in [psutil.Process(pid) for pid in psutil.pids()]:
if '--robot-server-port' in proc.cmdline() and str(port) in proc.cmdline():
if not is_process_running(proc.pid):
# process is zombie
continue
if autokill:
result = 'y'
else:
print('It seems that Renode process (pid {}, name {}) is currently running on port {}'.format(proc.pid, proc.name(), port))
result = input('Do you want me to kill it? [y/N] ')
if result in ['Y', 'y']:
proc.kill()
return True
break
except Exception:
# do nothing here
pass
return False
class KeywordsFinder(robot.model.SuiteVisitor):
def __init__(self, keyword):
self.keyword = keyword
self.occurences = 0
self.arguments = []
def visit_keyword(self, keyword):
if keyword.name == self.keyword:
self.occurences += 1
arguments = keyword.args
self.arguments.append(arguments)
def got_results(self):
return self.occurences > 0
class TestsFinder(robot.model.SuiteVisitor):
def __init__(self, keyword):
self.keyword = keyword
self.tests_matching = []
self.tests_not_matching = []
def isMatching(self, test):
finder = KeywordsFinder(self.keyword)
test.visit(finder)
return finder.got_results()
def visit_test(self, test):
if self.isMatching(test):
self.tests_matching.append(test)
else:
self.tests_not_matching.append(test)
class RobotTestSuite(object):
instances_count = 0
robot_frontend_process = None
hotspot_action = ['None', 'Pause', 'Serialize']
# Used to share the port between all suites when running sequentially
remote_server_port = -1
retry_test_regex = re.compile(r"\[RETRY\] (PASS|FAIL) on (\d+)\. retry\.")
retry_suite_regex = re.compile(r"|".join((
r"\[Errno \d+\] Connection refused",
r"Connection to remote server broken: \[WinError \d+\]",
r"Connecting remote server at [^ ]+ failed",
"Getting keyword names from library 'Remote' failed",
)))
def __init__(self, path):
self.path = path
self._dependencies_met = set()
self.remote_server_directory = None
self.renode_pid = -1
self.remote_server_port = -1
# Subset of RobotTestSuite.log_files which are "owned" by the running instance
self.suite_log_files = None
self.tests_with_hotspots = []
self.tests_without_hotspots = []
def check(self, options, number_of_runs):
# Checking if there are no other jobs is moved to `prepare` as it is now possible to skip used ports
pass
def get_output_dir(self, options, iteration_index, suite_retry_index):
return os.path.join(
options.results_directory,
f"iteration{iteration_index}" if options.iteration_count > 1 else "",
f"retry{suite_retry_index}" if options.retry_count > 1 else "",
)
def prepare(self, options):
RobotTestSuite.instances_count += 1
hotSpotTestFinder = TestsFinder(keyword="Handle Hot Spot")
suiteBuilder = robot.running.builder.TestSuiteBuilder()
suite = suiteBuilder.build(self.path)
suite.visit(hotSpotTestFinder)
self.tests_with_hotspots = [test.name for test in hotSpotTestFinder.tests_matching]
self.tests_without_hotspots = [test.name for test in hotSpotTestFinder.tests_not_matching]
# In parallel runs, each parallel group starts its own Renode process.
# The same is done in sequential runs with --keep-renode-output.
# see: run
if options.jobs == 1 and not options.keep_renode_output:
if not RobotTestSuite._is_frontend_running():
RobotTestSuite.robot_frontend_process = self._run_remote_server(options)
# Save port to reuse when running sequentially
RobotTestSuite.remote_server_port = self.remote_server_port
else:
# Restore port allocated by a previous suite
self.remote_server_port = RobotTestSuite.remote_server_port
@classmethod
def _is_frontend_running(cls):
return cls.robot_frontend_process is not None and is_process_running(cls.robot_frontend_process.pid)
def _run_remote_server(self, options, iteration_index=1, suite_retry_index=0):
if options.runner == 'dotnet':
remote_server_name = "Renode.dll"
if platform == "win32":
tfm = "net6.0-windows10.0.17763.0"
else:
tfm = "net6.0"
configuration = os.path.join(options.configuration, tfm)
else:
remote_server_name = options.remote_server_name
configuration = options.configuration
if options.remote_server_full_directory is not None:
if not os.path.isabs(options.remote_server_full_directory):
options.remote_server_full_directory = os.path.join(this_path, options.remote_server_full_directory)
self.remote_server_directory = options.remote_server_full_directory
else:
self.remote_server_directory = os.path.join(options.remote_server_directory_prefix, configuration)
remote_server_binary = os.path.join(self.remote_server_directory, remote_server_name)
if not os.path.isfile(remote_server_binary):
print("Robot framework remote server binary not found: '{}'! Did you forget to build?".format(remote_server_binary))
sys.exit(1)
if options.remote_server_port != 0 and not is_port_available(options.remote_server_port, options.autokill_renode):
print("The selected port {} is not available".format(options.remote_server_port))
sys.exit(1)
command = [remote_server_binary, '--robot-server-port', str(options.remote_server_port)]
if not options.show_log and not options.keep_renode_output:
command.append('--hide-log')
if not options.enable_xwt:
command.append('--disable-gui')
if options.debug_on_error:
command.append('--robot-debug-on-error')
if options.keep_temps:
command.append('--keep-temporary-files')
if options.renode_config:
command.append('--config')
command.append(options.renode_config)
if options.runner == 'mono':
command.insert(0, 'mono')
if options.port is not None:
if options.suspend:
print('Waiting for a debugger at port: {}'.format(options.port))
command.insert(1, '--debug')
command.insert(2, '--debugger-agent=transport=dt_socket,server=y,suspend={0},address=127.0.0.1:{1}'.format('y' if options.suspend else 'n', options.port))
elif options.debug_mode:
command.insert(1, '--debug')
options.exclude.append('skip_mono')
elif options.runner == 'dotnet':
command.insert(0, 'dotnet')
options.exclude.append('skip_dotnet')
renode_command = command
# if we started GDB, wait for the user to start Renode as a child process
if options.run_gdb:
command = ['gdb', '-nx', '-ex', 'handle SIGXCPU SIG33 SIG35 SIG36 SIGPWR nostop noprint', '--args'] + command
p = psutil.Popen(command, cwd=self.remote_server_directory, bufsize=1)
if options.keep_renode_output:
print("Note: --keep-renode-output is not supported when using --run-gdb")
print("Waiting for Renode process to start")
while True:
# We strip argv[0] because if we pass just `mono` to GDB it will resolve
# it to a full path to mono on the PATH, for example /bin/mono
renode_child = next((c for c in p.children() if c.cmdline()[1:] == renode_command[1:]), None)
if renode_child:
break
sleep(0.5)
self.renode_pid = renode_child.pid
elif options.perf_output_path:
pid_file_uuid = uuid.uuid4()
pid_filename = f'pid_file_{pid_file_uuid}'
command = ['perf', 'record', '-q', '-g', '-F', 'max'] + command + ['--pid-file', pid_filename]
perf_stdout_stderr_file_name = "perf_stdout_stderr"
if options.keep_renode_output:
print("Note: --keep-renode-output is not supported when using --run-gdb")
print(f"WARNING: perf stdout and stderr is being redirected to {perf_stdout_stderr_file_name}")
perf_stdout_stderr_file = open(perf_stdout_stderr_file_name, "w")
p = subprocess.Popen(command, cwd=self.remote_server_directory, bufsize=1, stdout=perf_stdout_stderr_file, stderr=perf_stdout_stderr_file)
pid_file_path = os.path.join(self.remote_server_directory, pid_filename)
perf_renode_timeout = 10
while not os.path.exists(pid_file_path) and perf_renode_timeout > 0:
sleep(0.5)
perf_renode_timeout -= 1
if perf_renode_timeout <= 0:
raise RuntimeError("Renode pid file could not be found, can't attach perf")
with open(pid_file_path, 'r') as pid_file:
self.renode_pid = pid_file.read()
else:
# Start Renode
if options.keep_renode_output:
output_dir = self.get_output_dir(options, iteration_index, suite_retry_index)
logs_dir = os.path.join(output_dir, 'logs')
os.makedirs(logs_dir, exist_ok=True)
file_name = os.path.splitext(os.path.basename(self.path))[0]
suite_name = RobotTestSuite._create_suite_name(file_name, None)
fout = open(os.path.join(logs_dir, f"{suite_name}.renode_stdout.log"), "wb", buffering=0)
ferr = open(os.path.join(logs_dir, f"{suite_name}.renode_stderr.log"), "wb", buffering=0)
p = subprocess.Popen(command, cwd=self.remote_server_directory, bufsize=1, stdout=fout, stderr=ferr)
self.renode_pid = p.pid
else:
p = subprocess.Popen(command, cwd=self.remote_server_directory, bufsize=1)
self.renode_pid = p.pid
countdown = 120
temp_dir = tempfile.gettempdir()
renode_port_file = os.path.join(temp_dir, f'renode-{self.renode_pid}', 'robot_port')
while countdown > 0:
try:
with open(renode_port_file) as f:
port_num = f.readline()
if port_num == '':
continue
self.remote_server_port = int(port_num)
break
except:
sleep(0.5)
countdown -= 1
if countdown == 0:
print("Couldn't access port file for Renode instance pid {}".format(self.renode_pid))
self._close_remote_server(p, options)
return None
print('Started Renode instance on port {}; pid {}'.format(self.remote_server_port, self.renode_pid))
return p
def __move_perf_data(self, options):
perf_data_path = os.path.join(self.remote_server_directory, "perf.data")
if not perf_data_path:
raise RuntimeError("perf.data file was not generated succesfully")
if not os.path.isdir(options.perf_output_path):
raise RuntimeError(f"{options.perf_output_path} is not a valid directory path")
shutil.move(perf_data_path, options.perf_output_path)
def _close_remote_server(self, proc, options):
if proc:
print('Closing Renode pid {}'.format(proc.pid))
try:
process = psutil.Process(proc.pid)
os.kill(proc.pid, 2)
process.wait(timeout=options.cleanup_timeout)
if options.perf_output_path:
self.__move_perf_data(options)
except psutil.TimeoutExpired:
process.kill()
process.wait()
except psutil.NoSuchProcess:
#evidently closed by other means
pass
if options.perf_output_path and proc.stdout:
proc.stdout.close()
def run(self, options, run_id=0, iteration_index=1, suite_retry_index=0):
if self.path.endswith('renode-keywords.robot'):
print('Ignoring helper file: {}'.format(self.path))
return True
print('Running ' + self.path)
result = None
# in non-parallel runs there is only one Renode process for all runs,
# unless --keep-renode-output is enabled, in which case a new process
# is spawned for every suite to ensure logs are separate files.
# see: prepare
if options.jobs != 1 or options.keep_renode_output:
proc = self._run_remote_server(options, iteration_index, suite_retry_index)
else:
proc = None
def get_result():
return result if result is not None else TestResult(True, None)
start_timestamp = monotonic()
if any(self.tests_without_hotspots):
result = get_result().ok and self._run_inner(options.fixture,
None,
self.tests_without_hotspots,
options,
iteration_index,
suite_retry_index)
if any(self.tests_with_hotspots):
for hotspot in RobotTestSuite.hotspot_action:
if options.hotspot and options.hotspot != hotspot:
continue
result = get_result().ok and self._run_inner(options.fixture,
hotspot,
self.tests_with_hotspots,
options,
iteration_index,
suite_retry_index)
end_timestamp = monotonic()
if result is None:
print(f'No tests executed for suite {self.path}', flush=True)
else:
status = 'finished successfully' if result.ok else 'failed'
exec_time = round(end_timestamp - start_timestamp, 2)
print(f'Suite {self.path} {status} in {exec_time} seconds.', flush=True)
self._close_remote_server(proc, options)
# make sure renode is still alive when a non-parallel run depends on it
if options.jobs == 1 and not options.keep_renode_output:
if not self._is_frontend_running():
print("Renode has unexpectedly died when running sequentially! Trying to respawn before continuing...")
RobotTestSuite.robot_frontend_process = self._run_remote_server(options, iteration_index, suite_retry_index)
# Save port to reuse when running sequentially
RobotTestSuite.remote_server_port = self.remote_server_port
return get_result()
def _get_dependencies(self, test_case):
suiteBuilder = robot.running.builder.TestSuiteBuilder()
suite = suiteBuilder.build(self.path)
test = next(t for t in suite.tests if hasattr(t, 'name') and t.name == test_case)
requirements = [s.args[0] for s in test.body if hasattr(s, 'name') and s.name == 'Requires']
if len(requirements) == 0:
return set()
if len(requirements) > 1:
raise Exception('Too many requirements for a single test. At most one is allowed.')
providers = [t for t in suite.tests if any(hasattr(s, 'name') and s.name == 'Provides' and s.args[0] == requirements[0] for s in t.body)]
if len(providers) > 1:
raise Exception('Too many providers for state {0} found: {1}'.format(requirements[0], ', '.join(providers.name)))
if len(providers) == 0:
raise Exception('No provider for state {0} found'.format(requirements[0]))
res = self._get_dependencies(providers[0].name)
res.add(providers[0].name)
return res
def cleanup(self, options):
assert hasattr(RobotTestSuite, "log_files"), "tests_engine.py did not assign RobotTestSuite.log_files"
RobotTestSuite.instances_count -= 1
if RobotTestSuite.instances_count == 0:
self._close_remote_server(RobotTestSuite.robot_frontend_process, options)
print("Aggregating all robot results")
grouped_log_files = self.group_log_paths(RobotTestSuite.log_files)
for iteration in range(1, options.iteration_count + 1):
for retry in range(options.retry_count):
output_dir = self.get_output_dir(options, iteration, retry)
log_files = grouped_log_files[(iteration, retry)]
# An output_dir can be missing for suite retries that were never "used"
if not os.path.isdir(output_dir) or not log_files:
continue
robot.rebot(
*log_files,
processemptysuite=True,
name='Test Suite',
loglevel="TRACE:INFO",
outputdir=output_dir,
output='robot_output.xml'
)
for file in set(log_files):
os.remove(file)
if options.css_file:
with open(options.css_file) as style:
style_content = style.read()
for report_name in ("report.html", "log.html"):
with open(os.path.join(output_dir, report_name), "a") as report:
report.write("<style media=\"all\" type=\"text/css\">")
report.write(style_content)
report.write("</style>")
if options.keep_renode_output:
logs_pattern = re.compile(r"(?P<suite_name>.*)\.renode_std(out|err)\.log")
for dirpath, _, fnames in os.walk(options.results_directory):
if os.path.basename(dirpath.rstrip("/")) != "logs":
continue
failed_suites = self.find_suites_with_fails(os.path.dirname(dirpath))
for fname in fnames:
fpath = os.path.join(dirpath, fname)
m = logs_pattern.match(fname)
if m:
# Remove empty logs
if os.path.getsize(fpath) == 0:
os.remove(fpath)
continue
if options.save_logs == "onfail":
# Remove logs which weren't failures
suite_name = m.group("suite_name")
if suite_name not in failed_suites:
os.remove(fpath)
# If the logs directory is empty, delete it
try:
os.rmdir(dirpath)
except OSError:
pass
def should_retry_suite(self, options, iteration_index, suite_retry_index):
tree = None
assert self.suite_log_files is not None, "The suite has not yet been run."
output_dir = self.get_output_dir(options, iteration_index, suite_retry_index)
for log_file in self.suite_log_files:
try:
tree = ET.parse(os.path.join(output_dir, log_file))
except FileNotFoundError as e:
raise e
root = tree.getroot()
for suite in root.iter('suite'):
if not suite.get('source', False):
continue # it is a tag used to group other suites without meaning on its own
# Always retry if our Setup failed.
for kw in suite.iter('kw'):
if kw.get('name') == 'Setup' and kw.get('library') == 'renode-keywords':
if kw.find('status').get('status') != 'PASS':
print('Renode Setup failure detected!')
return True
else:
break
# Look for regular expressions signifying a crash.
# Suite Setup and Suite Teardown aren't checked here cause they're in the `kw` tags.
for test in suite.iter('test'):
status = test.find('status') # only finds immediate children - important requirement
if status.text is not None and self.retry_suite_regex.search(status.text):
return True
for msg in test.iter("msg"):
if self.retry_suite_regex.search(msg.text):
return True
return False
@staticmethod
def _create_suite_name(test_name, hotspot):
return test_name + (' [HotSpot action: {0}]'.format(hotspot) if hotspot else '')
def _run_dependencies(self, test_cases_names, options, iteration_index=1, suite_retry_index=0):
test_cases_names.difference_update(self._dependencies_met)
if not any(test_cases_names):
return True
self._dependencies_met.update(test_cases_names)
return self._run_inner(None, None, test_cases_names, options, iteration_index, suite_retry_index)
def _run_inner(self, fixture, hotspot, test_cases_names, options, iteration_index=1, suite_retry_index=0):
file_name = os.path.splitext(os.path.basename(self.path))[0]
suite_name = RobotTestSuite._create_suite_name(file_name, hotspot)
output_dir = self.get_output_dir(options, iteration_index, suite_retry_index)
variables = [
'SKIP_RUNNING_SERVER:True',
'DIRECTORY:{}'.format(self.remote_server_directory),
'PORT_NUMBER:{}'.format(self.remote_server_port),
'RESULTS_DIRECTORY:{}'.format(output_dir),
]
if hotspot:
variables.append('HOTSPOT_ACTION:' + hotspot)
if options.debug_mode:
variables.append('CONFIGURATION:Debug')
if options.debug_on_error:
variables.append('HOLD_ON_ERROR:True')
if options.execution_metrics:
variables.append('CREATE_EXECUTION_METRICS:True')
if options.save_logs == "always":
variables.append('SAVE_LOGS_WHEN:Always')
if options.runner == 'dotnet':
variables.append('BINARY_NAME:Renode.dll')
variables.append('RENODE_PID:{}'.format(self.renode_pid))
variables.append('NET_PLATFORM:True')
else:
options.exclude.append('profiling')
if options.variables:
variables += options.variables
test_cases = [(test_name, '{0}.{1}'.format(suite_name, test_name)) for test_name in test_cases_names]
if fixture:
test_cases = [x for x in test_cases if fnmatch.fnmatch(x[1], '*' + fixture + '*')]
if len(test_cases) == 0:
return None
deps = set()
for test_name in (t[0] for t in test_cases):
deps.update(self._get_dependencies(test_name))
if not self._run_dependencies(deps, options, iteration_index, suite_retry_index):
return False
output_formatter = 'robot_output_formatter_verbose.py' if options.verbose else 'robot_output_formatter.py'
listeners = [os.path.join(this_path, output_formatter)]
if options.listener:
listeners += options.listener
if options.retry_count > 1:
listeners += [f'RetryFailed:{options.retry_count - 1}']
metadata = {"HotSpot_Action": hotspot if hotspot else '-'}
log_file = os.path.join(output_dir, 'results-{0}{1}.robot.xml'.format(file_name, '_' + hotspot if hotspot else ''))
keywords_path = os.path.abspath(os.path.join(this_path, "renode-keywords.robot"))
keywords_path = keywords_path.replace(os.path.sep, "/") # Robot wants forward slashes even on Windows
# This variable is provided for compatibility with Robot files that use Resource ${RENODEKEYWORDS}
variables.append('RENODEKEYWORDS:{}'.format(keywords_path))
tools_path = os.path.join(os.path.dirname(this_path), "tools")
tools_path = tools_path.replace(os.path.sep, "/")
variables.append('RENODETOOLS:{}'.format(tools_path))
suite_builder = robot.running.builder.TestSuiteBuilder()
suite = suite_builder.build(self.path)
suite.resource.imports.create(type="Resource", name=keywords_path)
suite.configure(include_tags=options.include, exclude_tags=options.exclude,
include_tests=[t[1] for t in test_cases], metadata=metadata,
name=suite_name, empty_suite_ok=True)
# Provide default values for {Suite,Test}{Setup,Teardown}
if not suite.setup:
suite.setup.config(name="Setup")
if not suite.teardown:
suite.teardown.config(name="Teardown")
for test in suite.tests:
if not test.setup:
test.setup.config(name="Reset Emulation")
if not test.teardown:
test.teardown.config(name="Test Teardown")
result = suite.run(console='none', listener=listeners, exitonfailure=options.stop_on_error, output=log_file, log=None, loglevel='TRACE', report=None, variable=variables, skiponfailure=['non_critical', 'skipped'])
self.suite_log_files = []
file_name = os.path.splitext(os.path.basename(self.path))[0]
output_dir = self.get_output_dir(options, iteration_index, suite_retry_index)
if any(self.tests_without_hotspots):
log_file = os.path.join(output_dir, 'results-{0}.robot.xml'.format(file_name))
if os.path.isfile(log_file):
self.suite_log_files.append(log_file)
if any(self.tests_with_hotspots):
for hotspot in RobotTestSuite.hotspot_action:
if options.hotspot and options.hotspot != hotspot:
continue
log_file = os.path.join(output_dir, 'results-{0}{1}.robot.xml'.format(file_name, '_' + hotspot if hotspot else ''))
if os.path.isfile(log_file):
self.suite_log_files.append(log_file)
if options.runner == "mono":
self.copy_mono_logs(options, iteration_index, suite_retry_index)
return TestResult(result.return_code == 0, self.suite_log_files)
def copy_mono_logs(self, options: Namespace, iteration_index: int, suite_retry_index: int) -> None:
"""Copies 'mono_crash.*.json' files into the suite's logs directory.
These files are occasionally created when mono crashes. There are also 'mono_crash.*.blob'
files, but they contain heavier memory dumps and have questionable usefulness."""
output_dir = self.get_output_dir(options, iteration_index, suite_retry_index)
logs_dir = os.path.join(output_dir, "logs")
for dirpath, dirnames, fnames in os.walk(os.getcwd()):
# Do not descend into "logs" directories, to prevent later invocations from
# stealing files already moved by earlier invocations
logs_indices = [x for x in range(len(dirnames)) if dirnames[x] == "logs"]
logs_indices.sort(reverse=True)
for logs_idx in logs_indices:
del dirnames[logs_idx]
for fname in filter(lambda x: x.startswith("mono_crash.") and x.endswith(".json"), fnames):
os.makedirs(logs_dir, exist_ok=True)
src_fpath = os.path.join(dirpath, fname)
dest_fpath = os.path.join(logs_dir, fname)
print(f"Moving mono_crash file: '{src_fpath}' -> '{dest_fpath}'")
os.rename(src_fpath, dest_fpath)
@staticmethod
def find_failed_tests(path, file="robot_output.xml"):
ret = {'mandatory': set(), 'non_critical': set()}
# Aggregate failed tests from all report files (can be multiple if iterations or retries were used)
for dirpath, _, fnames in os.walk(path):
for fname in filter(lambda x: x == file, fnames):
tree = ET.parse(os.path.join(dirpath, fname))
root = tree.getroot()
for suite in root.iter('suite'):
if not suite.get('source', False):
continue # it is a tag used to group other suites without meaning on its own
for test in suite.iter('test'):
status = test.find('status') # only finds immediate children - important requirement
if status.attrib['status'] == 'FAIL':
test_name = test.attrib['name']
suite_name = suite.attrib['name']
if suite_name == "Test Suite":
# If rebot is invoked with only 1 suite, it renames that suite to Test Suite
# instead of wrapping in a new top-level Test Suite. A workaround is to extract
# the suite name from the *.robot file name.
suite_name = os.path.basename(suite.attrib["source"]).rsplit(".", 1)[0]
if test.find("./tags/[tag='skipped']"):
continue # skipped test should not be classified as fail
if test.find("./tags/[tag='non_critical']"):
ret['non_critical'].add(f"{suite_name}.{test_name}")
else:
ret['mandatory'].add(f"{suite_name}.{test_name}")
if not ret['mandatory'] and not ret['non_critical']:
return None
return ret
@classmethod
def find_suites_with_fails(cls, path, file="robot_output.xml"):
"""Finds suites which contain at least one test case failure.
A suite may be successful and still contain failures, e.g. if the --retry option
was used and a test passed on a later attempt."""
ret = set()
for dirpath, _, fnames in os.walk(path):
for fname in filter(lambda x: x == file, fnames):
tree = ET.parse(os.path.join(dirpath, fname))
root = tree.getroot()
for suite in root.iter('suite'):
if not suite.get('source', False):
continue # it is a tag used to group other suites without meaning on its own
suite_name = suite.attrib['name']
if suite_name == "Test Suite":
# If rebot is invoked with only 1 suite, it renames that suite to Test Suite
# instead of wrapping in a new top-level Test Suite. A workaround is to extract
# the suite name from the *.robot file name.
suite_name = os.path.basename(suite.attrib["source"]).rsplit(".", 1)[0]
for test in suite.iter('test'):
if test.find("./tags/[tag='skipped']"):
continue # skipped test should not be classified as fail
status = test.find('status') # only finds immediate children - important requirement
if status.attrib["status"] == "FAIL":
ret.add(suite_name)
break
if status.text is not None and cls.retry_test_regex.search(status.text):
# Retried test cases still count as fails
ret.add(suite_name)
break
return ret
@staticmethod
def group_log_paths(paths: List[str]) -> Dict[Tuple[int, int], Set[str]]:
"""Breaks a list of log paths into subsets grouped by (iteration, suite_retry) pairs."""
re_path_indices_patterns = (
re.compile(r"\biteration(?P<iteration>\d+)/retry(?P<suite_retry>\d+)/"),
re.compile(r"\bretry(?P<suite_retry>\d+)/"),
re.compile(r"\biteration(?P<iteration>\d+)/"),
)
ret = defaultdict(lambda: set())
for path in paths:
iteration = 1
suite_retry = 0
for pattern in re_path_indices_patterns:
match = pattern.search(path)
if match is None:
continue
try:
iteration = int(match.group("iteration"))
except IndexError:
pass
try:
suite_retry = int(match.group("suite_retry"))
except IndexError:
pass
ret[(iteration, suite_retry)].add(path)
return ret
@classmethod
def find_rerun_tests(cls, path):
def analyze_xml(label, retry_dir, file="robot_output.xml"):
try:
tree = ET.parse(os.path.join(retry_dir, file))
except FileNotFoundError:
return
root = tree.getroot()
for suite in root.iter('suite'):
if not suite.get('source', False):
continue # it is a tag used to group other suites without meaning on its own
suite_name = suite.attrib['name']
if suite_name == "Test Suite":
# If rebot is invoked with only 1 suite, it renames that suite to Test Suite
# instead of wrapping in a new top-level Test Suite. A workaround is to extract
# the suite name from the *.robot file name.
suite_name = os.path.basename(suite.attrib["source"]).rsplit(".", 1)[0]
for test in suite.iter('test'):
test_name = test.attrib['name']
tags = []
if test.find("./tags/[tag='skipped']"):
continue # skipped test should not be classified as fail
if test.find("./tags/[tag='non_critical']"):
tags.append("non_critical")
status = test.find('status') # only finds immediate children - important requirement
m = cls.retry_test_regex.search(status.text) if status.text is not None else None
# Check whether renode crashed during this test
has_renode_crashed = False
if status.text is not None and cls.retry_suite_regex.search(status.text):
has_renode_crashed = True
else:
has_renode_crashed = any(cls.retry_suite_regex.search(msg.text) for msg in test.iter("msg"))
status_str = status.attrib["status"]
nth = (1 + int(m.group(2))) if m else 1
key = f"{suite_name}.{test_name}"
if key not in data:
data[key] = []
data[key].append({
"label": label, # e.g. "retry0", "retry1", "iteration1/retry2", ...
"status": status_str, # e.g. "PASS", "FAIL", "SKIP", ...
"nth": nth, # The number of test case attempts that led to the above status
"tags": tags, # e.g. ["non_critical"], [], ...
"crash": has_renode_crashed,
})
def analyze_iteration(iteration_dir):
iteration_dirname = os.path.basename(iteration_dir)
report_fpath = os.path.join(iteration_dir, "robot_output.xml")
if os.path.isfile(report_fpath):
analyze_xml(iteration_dirname, iteration_dir)
return
i = -1
while True:
i += 1
retry_dirpath = os.path.join(iteration_dir, f"retry{i}")
if os.path.isdir(retry_dirpath):
analyze_xml(os.path.join(iteration_dirname, f"retry{i}"), retry_dirpath)
continue
break
data = OrderedDict()
i = -1
while True:
i += 1
iteration_dirpath = os.path.join(path, f"iteration{i + 1}")
retry_dirpath = os.path.join(path, f"retry{i}")
if os.path.isdir(iteration_dirpath):
analyze_iteration(iteration_dirpath)
continue
elif os.path.isdir(retry_dirpath):
analyze_xml(f"retry{i}", retry_dirpath)
continue
break
return data