blob: bf100367fcf449a04070f0c1ed6996cb41654db1 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import locale
import logging
import os
import re
import shlex
import signal
import subprocess
import time
from pathlib import Path
import serial
log = logging.getLogger(__name__)
class Process:
def __init__(self,
cmd,
logdir,
cwd=None,
startup_done_expect=None,
startup_timeout=None,
default_filter_func=None):
"""Utility class used to spawn an interact with processes.
Args:
cmd: Command line argument used to spawn process.
logdir: Directory used to store STDOUT and STDERR output.
cwd: Working directory used to spawn process.
startup_done_expect: Pattern used to check at process startup time.
startup_timeout: Timeout in seconds for |startup_done_expect| check.
default_filter_func: Default function to filter all stdout/stderr
output with when matching it with find_in_output().
"""
if isinstance(cmd, str):
self.cmd = shlex.split(cmd)
else:
self.cmd = [str(c) for c in cmd]
self.logdir = str(logdir)
self.cwd = str(cwd)
self.startup_done_expect = startup_done_expect
self.startup_timeout = startup_timeout
self.proc = None
self.logger = logging.getLogger(__name__)
self._f_stdout = None
self._f_stderr = None
self._f_stdout_r = None
self._f_stderr_r = None
self.default_filter_func = default_filter_func
def __del__(self):
try:
self.proc.kill()
self._f_stdout.close()
self._f_stderr.close()
self._f_stdout_r.close()
self._f_stderr_r.close()
except Exception:
pass
def is_running(self):
""" Check if the process is running
Returns:
True if the process is running, False otherwise
"""
return self.proc and self.proc.poll() is not None
def run(self):
"""Start process with command line configured in constructor."""
cmd_name = os.path.basename(self.cmd[0])
# Enforce line-buffered STDOUT even when sending STDOUT/STDERR to file.
# If applications don't fflush() STDOUT manually, STDOUT goes through
# a 4kB buffer before we see any output, which prevents searching for
# the string indicating a successful startup.
# see discussion at http://www.pixelbeat.org/programming/stdio_buffering/
cmd = ['stdbuf', '-oL'] + self.cmd
self.logger.info("Running command " +
' '.join(shlex.quote(w) for w in cmd))
logfile_stdout = os.path.join(self.logdir,
"{}.stdout".format(cmd_name))
logfile_stderr = os.path.join(self.logdir,
"{}.stderr".format(cmd_name))
self.logger.debug("Capturing STDOUT at " + logfile_stdout)
self.logger.debug("Capturing STDERR at " + logfile_stderr)
self._f_stdout = open(logfile_stdout, 'w')
self._f_stderr = open(logfile_stderr, 'w')
self.proc = subprocess.Popen(cmd,
cwd=self.cwd,
universal_newlines=True,
bufsize=1, # Use line buffering
stdin=subprocess.PIPE,
stdout=self._f_stdout,
stderr=self._f_stderr)
self._f_stdout_r = open(logfile_stdout, 'r')
self._f_stderr_r = open(logfile_stderr, 'r')
# no startup match pattern given => startup done!
if self.startup_done_expect is None:
return True
# check if the string indicating a successful startup appears in the
# the program output (STDOUT or STDERR).
try:
init_done = self.find_in_output(pattern=self.startup_done_expect,
timeout=self.startup_timeout)
if init_done is None:
raise RuntimeError('No match for pattern {!r} in startup.'
.format(str(self.startup_done_expect)))
except subprocess.TimeoutExpired as err:
# On time-out, find_in_output will raise a TimeoutExpired exception
# but (of course) it doesn't know the command it's running, so we
# amend it as it goes past.
assert err.cmd is None
err.cmd = cmd
raise
self.logger.info("Startup sequence matched, startup done.")
return True
def terminate(self):
"""Terminates process started by run call."""
if self.proc is not None:
self.proc.terminate()
def send_ctrl_c(self):
"""Sends SIGINT to process started by run call."""
if self.proc is not None:
self.proc.send_signal(signal.SIGINT)
def expect(self, stdin_data=None, pattern=None, timeout=None):
"""Write data to STDIN and check if the output is as expected.
Args:
stdin_data: Data to send to STDIN.
pattern: Pattern to search for after sending |stdin_data|.
timeout: Timeout in seconds for |pattern| check.
Returns:
True if |pattern| found, False otherwise.
"""
# We don't get STDOUT/STDERR from subprocess.communicate() as it's
# redirected to file. We need to read the files instead.
# XXX: races (false positives) can happen here if output is generated
# before the input is sent to the process.
if pattern is None:
self._f_stdout_r.seek(0, 2)
self.proc.stdin.write(stdin_data)
self.proc.stdin.flush()
if pattern is None:
return True
return self.find_in_output(pattern, timeout) is not None
def find_in_output(self,
pattern,
timeout,
from_start=False,
filter_func=None):
"""Read STDOUT and STDERR to find an expected pattern.
See find_in_files() for more documentation.
"""
if filter_func is None:
filter_func = self.default_filter_func
def wait_func():
""" Wait up to 0.2s until the process terminates.
Returns:
True if the subprocess terminated and a further wait will not
produce more output, False otherwise.
"""
try:
self.proc.wait(timeout=0.2)
except subprocess.TimeoutExpired:
# The process did not yet terminate.
return False
# The process did terminate.
return True
return find_in_files([self._f_stdout_r, self._f_stderr_r],
pattern,
timeout,
filter_func=filter_func,
from_start=from_start,
wait_func=wait_func)
class LoggingSerial(serial.Serial):
""" Acess to a serial console which logs all read/written data to file. """
def __init__(self,
*args,
log_dir_path,
default_filter_func=None,
**kwargs):
super().__init__(*args, **kwargs)
self._log_to_device_fp = open(str(log_dir_path / 'to-device.log'),
'wb')
self._log_from_device_fp = open(str(log_dir_path / 'from-device.log'),
'wb')
self.default_filter_func = default_filter_func
log.debug("Logging UART communication for {} to {}".format(
self.port, str(log_dir_path)))
def read(self, size=1):
bytes = super().read(size)
self._log_from_device_fp.write(bytes)
# Explicitly flush the log file to ensure that data is present in the
# log file when the conftest log dumping routines read the file after a
# failed test.
self._log_from_device_fp.flush()
return bytes
def write(self, data):
retcode = super().write(data)
self._log_to_device_fp.write(data)
# Explicitly flush the log file to ensure that data is present in the
# log file when the conftest log dumping routines read the file after a
# failed test.
self._log_to_device_fp.flush()
return retcode
def close(self):
if self._log_to_device_fp:
self._log_to_device_fp.close()
if self._log_from_device_fp:
self._log_from_device_fp.close()
super().close()
def log_add_marker(self, text: str):
""" Write a marker text into the UART send and receive log files """
text_b = text.encode('UTF-8')
self._log_to_device_fp.write(text_b)
self._log_from_device_fp.write(text_b)
def drain_in(self, timeout=None, silence_time=.5):
""" Read all available input data
Args:
timeout: Maximum time this function blocks, in seconds.
silence_time: Consider the input drained if no new data can be read
after this time, in seconds.
Returns:
The data read from the device.
"""
t_end = None
if timeout is not None:
t_end = time.time() + timeout
read_data = b''
first_iteration = True
while first_iteration or self.in_waiting != 0:
if timeout is not None and time.time() >= t_end:
break
read_data += self.read(self.in_waiting)
first_iteration = False
time.sleep(silence_time)
return read_data
def find_in_output(self, pattern, timeout=None, filter_func=None):
""" Expect a pattern to appear in one of the output lines.
See the documentation of match_line() for a description of |pattern|.
Args:
pattern: Pattern to search for
timeout: Timeout in seconds for |pattern| check.
filter_func: Function to filter the output with before applying
|pattern|. If none, the |self.default_filter_func| is used.
Returns:
None if |pattern| was not found, the return value of match_line()
otherwise.
"""
if filter_func is None:
filter_func = self.default_filter_func
t_end = None
if timeout is not None:
t_end = time.time() + timeout
line = self.readline()
while True:
m = match_line(line, pattern, filter_func)
if m is not None:
return m
if timeout is not None and time.time() >= t_end:
break
line = self.readline()
return None
def dump_temp_files(tmp_path):
""" Dump all files in a directory (typically logs) """
logging.debug(
"================= DUMP OF ALL TEMPORARY FILES =================")
tmp_files = [
Path(root) / f for root, dirs, files in os.walk(str(tmp_path))
for f in files
]
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} |
set(range(0x20, 0x100)) - {0x7f})
def is_binary_string(bytes):
return bool(bytes.translate(None, textchars))
for f in tmp_files:
if f.name == '.lock':
continue
logging.debug("vvvvvvvvvvvvvvvvvvvv {} vvvvvvvvvvvvvvvvvvvv".format(f))
if not f.is_file():
logging.debug("[Not a regular file.]")
elif f.stat().st_size > 50 * 1024:
logging.debug("[File is too large to be shown ({} bytes).]".format(
f.stat().st_size))
else:
with open(str(f), 'rb') as fp:
data = fp.read(1024)
if is_binary_string(data):
logging.debug(
"[File contains {} bytes of binary data.]".format(
f.stat().st_size))
else:
fp.seek(0)
for line in fp:
line_str = line.decode(locale.getpreferredencoding(),
errors='backslashreplace')
logging.debug(line_str.rstrip())
logging.debug(
"^^^^^^^^^^^^^^^^^^^^ {} ^^^^^^^^^^^^^^^^^^^^\n".format(f))
def load_sw_over_spi(tmp_path,
spiflash_path,
sw_test_bin,
spiflash_args=[],
timeout=600):
""" Use the spiflash utility to load software onto a device. """
log.info("Flashing device software from {} over SPI".format(
str(sw_test_bin)))
cmd_flash = [spiflash_path, '--input', sw_test_bin] + spiflash_args
p_flash = Process(cmd_flash, logdir=tmp_path, cwd=tmp_path)
p_flash.run()
p_flash.proc.wait(timeout=timeout)
assert p_flash.proc.returncode == 0
log.info("Device software flashed.")
def find_in_files(file_objects,
pattern,
timeout,
from_start=False,
filter_func=None,
wait_func=None):
"""Find a pattern in a list of file objects (file descriptors)
See the documentation of match_line() for a description of |pattern|.
Args:
pattern: Pattern to search for
timeout: Timeout in seconds for |pattern| check. Set to None to wait
indefinitely.
from_start: Search from the start of the given file objects.
filter_func: Function to filter the output with before applying
|pattern|. If none, the |default_filter_func| is used.
wait_func: Function to call to wait. The function is expected to return
True if no further output is expected in |file_objects| (this will
end the wait loop before |timeout| expires), and False otherwise
(more output could be produced).
Returns:
If |pattern| was found, returns the return value of match_line().
Otherwise, returns None if |wait_func| returned True (signalling the
end of the input with no match).
Raises a subprocess.TimeoutExpired exception on timeout. This will have
a |cmd| field of None.
"""
t_end = None
if timeout is not None:
t_end = time.time() + timeout
if wait_func is None:
# By default, sleep for 200 ms when waiting for new input.
def wait_func():
time.sleep(.2)
return False
if from_start:
for file_object in file_objects:
file_object.seek(0)
end_loop = False
while True:
for file_object in file_objects:
for line in file_object:
m = match_line(line.rstrip(), pattern, filter_func)
if m is not None:
return m
if end_loop:
break
if timeout is not None and time.time() >= t_end:
raise subprocess.TimeoutExpired(None, timeout)
# The wait function returns True to indicate that no more data will be
# produced (e.g. because the producing process terminated). But we still
# need to check one last time if the already produced data is matching
# the `pattern`.
end_loop = wait_func()
return None
def match_line(line, pattern, filter_func=None):
"""
Check if a line matches a pattern
Line endings (CR/LR) are removed from |line| and neither matched nor
returned.
pattern can be of two types:
1. A regular expression (any object with a match attribute, or a re.Pattern
object in Python 3.7+). In this case, the pattern is matched against all
lines and the result of re.match(pattern) (a re.Match object since
Python 3.7) is returned. Note that re.match() always matches from the
beginning of the line.
2. A string. In this case pattern is compared to each line with
startswith(), and the full matching line is returned on a match.
If no match is found None is returned.
Optionally apply a filter to the line before matching it.
"""
if isinstance(line, bytes):
line = line.rstrip(b'\r\n')
else:
line = line.rstrip('\r\n')
if filter_func is not None:
line = filter_func(line)
# TODO: Check for an instance of re.Pattern once we move to Python 3.7+
if hasattr(pattern, "match"):
return pattern.match(line)
else:
if line.startswith(pattern):
return line
return None
def filter_remove_device_sw_log_prefix(line):
"""
Remove the file/line information in log messages produced by device software
LOG() macros.
"""
# See base_log_internal_core() in lib/base/log.c for the format description.
pattern = r'^[IWEF?]\d{5} [a-zA-Z0-9\.-_]+:\d+\] '
if isinstance(line, bytes):
return re.sub(bytes(pattern, encoding='utf-8'), b'', line)
else:
return re.sub(pattern, '', line)
def filter_remove_sw_test_status_log_prefix(line):
"""
Remove the logging prefix produced by the sw_test_status DV component.
"""
# Example of a full prefix to be matched:
# 1629002: (../src/lowrisc_dv_sw_test_status_0/sw_test_status_if.sv:42) [TOP.chip_earlgrey_verilator.u_sw_test_status_if]
pattern = r'\d+: \(.+/sw_test_status_if\.sv:\d+\) \[TOP\..+\] '
if isinstance(line, bytes):
return re.sub(bytes(pattern, encoding='utf-8'), b'', line)
else:
return re.sub(pattern, '', line)