| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| import locale |
| import logging |
| import os |
| import re |
| import shlex |
| import signal |
| import subprocess |
| import time |
| from pathlib import Path |
| |
| import serial |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class Process: |
| def __init__(self, |
| cmd, |
| logdir, |
| cwd=None, |
| startup_done_expect=None, |
| startup_timeout=None, |
| default_filter_func=None): |
| """Utility class used to spawn an interact with processes. |
| |
| Args: |
| cmd: Command line argument used to spawn process. |
| logdir: Directory used to store STDOUT and STDERR output. |
| cwd: Working directory used to spawn process. |
| startup_done_expect: Pattern used to check at process startup time. |
| startup_timeout: Timeout in seconds for |startup_done_expect| check. |
| default_filter_func: Default function to filter all stdout/stderr |
| output with when matching it with find_in_output(). |
| """ |
| if isinstance(cmd, str): |
| self.cmd = shlex.split(cmd) |
| else: |
| self.cmd = [str(c) for c in cmd] |
| self.logdir = str(logdir) |
| self.cwd = str(cwd) |
| self.startup_done_expect = startup_done_expect |
| self.startup_timeout = startup_timeout |
| self.proc = None |
| self.logger = logging.getLogger(__name__) |
| |
| self._f_stdout = None |
| self._f_stderr = None |
| self._f_stdout_r = None |
| self._f_stderr_r = None |
| |
| self.default_filter_func = default_filter_func |
| |
| def __del__(self): |
| try: |
| self.proc.kill() |
| self._f_stdout.close() |
| self._f_stderr.close() |
| self._f_stdout_r.close() |
| self._f_stderr_r.close() |
| except Exception: |
| pass |
| |
| def is_running(self): |
| """ Check if the process is running |
| |
| Returns: |
| True if the process is running, False otherwise |
| """ |
| return self.proc and self.proc.poll() is not None |
| |
| def run(self): |
| """Start process with command line configured in constructor.""" |
| cmd_name = os.path.basename(self.cmd[0]) |
| |
| # Enforce line-buffered STDOUT even when sending STDOUT/STDERR to file. |
| # If applications don't fflush() STDOUT manually, STDOUT goes through |
| # a 4kB buffer before we see any output, which prevents searching for |
| # the string indicating a successful startup. |
| # see discussion at http://www.pixelbeat.org/programming/stdio_buffering/ |
| cmd = ['stdbuf', '-oL'] + self.cmd |
| self.logger.info("Running command " + |
| ' '.join(shlex.quote(w) for w in cmd)) |
| |
| logfile_stdout = os.path.join(self.logdir, |
| "{}.stdout".format(cmd_name)) |
| logfile_stderr = os.path.join(self.logdir, |
| "{}.stderr".format(cmd_name)) |
| self.logger.debug("Capturing STDOUT at " + logfile_stdout) |
| self.logger.debug("Capturing STDERR at " + logfile_stderr) |
| |
| self._f_stdout = open(logfile_stdout, 'w') |
| self._f_stderr = open(logfile_stderr, 'w') |
| self.proc = subprocess.Popen(cmd, |
| cwd=self.cwd, |
| universal_newlines=True, |
| bufsize=1, # Use line buffering |
| stdin=subprocess.PIPE, |
| stdout=self._f_stdout, |
| stderr=self._f_stderr) |
| |
| self._f_stdout_r = open(logfile_stdout, 'r') |
| self._f_stderr_r = open(logfile_stderr, 'r') |
| |
| # no startup match pattern given => startup done! |
| if self.startup_done_expect is None: |
| return True |
| |
| # check if the string indicating a successful startup appears in the |
| # the program output (STDOUT or STDERR). |
| try: |
| init_done = self.find_in_output(pattern=self.startup_done_expect, |
| timeout=self.startup_timeout) |
| if init_done is None: |
| raise RuntimeError('No match for pattern {!r} in startup.' |
| .format(str(self.startup_done_expect))) |
| except subprocess.TimeoutExpired as err: |
| # On time-out, find_in_output will raise a TimeoutExpired exception |
| # but (of course) it doesn't know the command it's running, so we |
| # amend it as it goes past. |
| assert err.cmd is None |
| err.cmd = cmd |
| raise |
| |
| self.logger.info("Startup sequence matched, startup done.") |
| |
| return True |
| |
| def terminate(self): |
| """Terminates process started by run call.""" |
| if self.proc is not None: |
| self.proc.terminate() |
| |
| def send_ctrl_c(self): |
| """Sends SIGINT to process started by run call.""" |
| if self.proc is not None: |
| self.proc.send_signal(signal.SIGINT) |
| |
| def expect(self, stdin_data=None, pattern=None, timeout=None): |
| """Write data to STDIN and check if the output is as expected. |
| |
| Args: |
| stdin_data: Data to send to STDIN. |
| pattern: Pattern to search for after sending |stdin_data|. |
| timeout: Timeout in seconds for |pattern| check. |
| Returns: |
| True if |pattern| found, False otherwise. |
| """ |
| # We don't get STDOUT/STDERR from subprocess.communicate() as it's |
| # redirected to file. We need to read the files instead. |
| |
| # XXX: races (false positives) can happen here if output is generated |
| # before the input is sent to the process. |
| if pattern is None: |
| self._f_stdout_r.seek(0, 2) |
| |
| self.proc.stdin.write(stdin_data) |
| self.proc.stdin.flush() |
| |
| if pattern is None: |
| return True |
| |
| return self.find_in_output(pattern, timeout) is not None |
| |
| def find_in_output(self, |
| pattern, |
| timeout, |
| from_start=False, |
| filter_func=None): |
| """Read STDOUT and STDERR to find an expected pattern. |
| |
| See find_in_files() for more documentation. |
| """ |
| |
| if filter_func is None: |
| filter_func = self.default_filter_func |
| |
| def wait_func(): |
| """ Wait up to 0.2s until the process terminates. |
| |
| Returns: |
| True if the subprocess terminated and a further wait will not |
| produce more output, False otherwise. |
| """ |
| try: |
| self.proc.wait(timeout=0.2) |
| except subprocess.TimeoutExpired: |
| # The process did not yet terminate. |
| return False |
| |
| # The process did terminate. |
| return True |
| |
| return find_in_files([self._f_stdout_r, self._f_stderr_r], |
| pattern, |
| timeout, |
| filter_func=filter_func, |
| from_start=from_start, |
| wait_func=wait_func) |
| |
| |
| class LoggingSerial(serial.Serial): |
| """ Acess to a serial console which logs all read/written data to file. """ |
| def __init__(self, |
| *args, |
| log_dir_path, |
| default_filter_func=None, |
| **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| self._log_to_device_fp = open(str(log_dir_path / 'to-device.log'), |
| 'wb') |
| self._log_from_device_fp = open(str(log_dir_path / 'from-device.log'), |
| 'wb') |
| |
| self.default_filter_func = default_filter_func |
| |
| log.debug("Logging UART communication for {} to {}".format( |
| self.port, str(log_dir_path))) |
| |
| def read(self, size=1): |
| bytes = super().read(size) |
| self._log_from_device_fp.write(bytes) |
| |
| # Explicitly flush the log file to ensure that data is present in the |
| # log file when the conftest log dumping routines read the file after a |
| # failed test. |
| self._log_from_device_fp.flush() |
| |
| return bytes |
| |
| def write(self, data): |
| retcode = super().write(data) |
| self._log_to_device_fp.write(data) |
| |
| # Explicitly flush the log file to ensure that data is present in the |
| # log file when the conftest log dumping routines read the file after a |
| # failed test. |
| self._log_to_device_fp.flush() |
| |
| return retcode |
| |
| def close(self): |
| if self._log_to_device_fp: |
| self._log_to_device_fp.close() |
| |
| if self._log_from_device_fp: |
| self._log_from_device_fp.close() |
| |
| super().close() |
| |
| def log_add_marker(self, text: str): |
| """ Write a marker text into the UART send and receive log files """ |
| text_b = text.encode('UTF-8') |
| self._log_to_device_fp.write(text_b) |
| self._log_from_device_fp.write(text_b) |
| |
| def drain_in(self, timeout=None, silence_time=.5): |
| """ Read all available input data |
| |
| Args: |
| timeout: Maximum time this function blocks, in seconds. |
| silence_time: Consider the input drained if no new data can be read |
| after this time, in seconds. |
| Returns: |
| The data read from the device. |
| """ |
| t_end = None |
| if timeout is not None: |
| t_end = time.time() + timeout |
| |
| read_data = b'' |
| |
| first_iteration = True |
| while first_iteration or self.in_waiting != 0: |
| if timeout is not None and time.time() >= t_end: |
| break |
| |
| read_data += self.read(self.in_waiting) |
| first_iteration = False |
| time.sleep(silence_time) |
| |
| return read_data |
| |
| def find_in_output(self, pattern, timeout=None, filter_func=None): |
| """ Expect a pattern to appear in one of the output lines. |
| |
| See the documentation of match_line() for a description of |pattern|. |
| |
| Args: |
| pattern: Pattern to search for |
| timeout: Timeout in seconds for |pattern| check. |
| filter_func: Function to filter the output with before applying |
| |pattern|. If none, the |self.default_filter_func| is used. |
| Returns: |
| None if |pattern| was not found, the return value of match_line() |
| otherwise. |
| """ |
| |
| if filter_func is None: |
| filter_func = self.default_filter_func |
| |
| t_end = None |
| if timeout is not None: |
| t_end = time.time() + timeout |
| |
| line = self.readline() |
| while True: |
| m = match_line(line, pattern, filter_func) |
| if m is not None: |
| return m |
| |
| if timeout is not None and time.time() >= t_end: |
| break |
| |
| line = self.readline() |
| |
| return None |
| |
| |
| def dump_temp_files(tmp_path): |
| """ Dump all files in a directory (typically logs) """ |
| logging.debug( |
| "================= DUMP OF ALL TEMPORARY FILES =================") |
| |
| tmp_files = [ |
| Path(root) / f for root, dirs, files in os.walk(str(tmp_path)) |
| for f in files |
| ] |
| |
| textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | |
| set(range(0x20, 0x100)) - {0x7f}) |
| |
| def is_binary_string(bytes): |
| return bool(bytes.translate(None, textchars)) |
| |
| for f in tmp_files: |
| if f.name == '.lock': |
| continue |
| |
| logging.debug("vvvvvvvvvvvvvvvvvvvv {} vvvvvvvvvvvvvvvvvvvv".format(f)) |
| |
| if not f.is_file(): |
| logging.debug("[Not a regular file.]") |
| elif f.stat().st_size > 50 * 1024: |
| logging.debug("[File is too large to be shown ({} bytes).]".format( |
| f.stat().st_size)) |
| else: |
| with open(str(f), 'rb') as fp: |
| data = fp.read(1024) |
| if is_binary_string(data): |
| logging.debug( |
| "[File contains {} bytes of binary data.]".format( |
| f.stat().st_size)) |
| else: |
| fp.seek(0) |
| for line in fp: |
| line_str = line.decode(locale.getpreferredencoding(), |
| errors='backslashreplace') |
| logging.debug(line_str.rstrip()) |
| logging.debug( |
| "^^^^^^^^^^^^^^^^^^^^ {} ^^^^^^^^^^^^^^^^^^^^\n".format(f)) |
| |
| |
| def load_sw_over_spi(tmp_path, |
| spiflash_path, |
| sw_test_bin, |
| spiflash_args=[], |
| timeout=600): |
| """ Use the spiflash utility to load software onto a device. """ |
| |
| log.info("Flashing device software from {} over SPI".format( |
| str(sw_test_bin))) |
| |
| cmd_flash = [spiflash_path, '--input', sw_test_bin] + spiflash_args |
| p_flash = Process(cmd_flash, logdir=tmp_path, cwd=tmp_path) |
| p_flash.run() |
| p_flash.proc.wait(timeout=timeout) |
| assert p_flash.proc.returncode == 0 |
| |
| log.info("Device software flashed.") |
| |
| |
| def find_in_files(file_objects, |
| pattern, |
| timeout, |
| from_start=False, |
| filter_func=None, |
| wait_func=None): |
| """Find a pattern in a list of file objects (file descriptors) |
| |
| See the documentation of match_line() for a description of |pattern|. |
| |
| Args: |
| pattern: Pattern to search for |
| timeout: Timeout in seconds for |pattern| check. Set to None to wait |
| indefinitely. |
| from_start: Search from the start of the given file objects. |
| filter_func: Function to filter the output with before applying |
| |pattern|. If none, the |default_filter_func| is used. |
| wait_func: Function to call to wait. The function is expected to return |
| True if no further output is expected in |file_objects| (this will |
| end the wait loop before |timeout| expires), and False otherwise |
| (more output could be produced). |
| |
| Returns: |
| If |pattern| was found, returns the return value of match_line(). |
| Otherwise, returns None if |wait_func| returned True (signalling the |
| end of the input with no match). |
| |
| Raises a subprocess.TimeoutExpired exception on timeout. This will have |
| a |cmd| field of None. |
| |
| """ |
| |
| t_end = None |
| if timeout is not None: |
| t_end = time.time() + timeout |
| |
| if wait_func is None: |
| # By default, sleep for 200 ms when waiting for new input. |
| def wait_func(): |
| time.sleep(.2) |
| return False |
| |
| if from_start: |
| for file_object in file_objects: |
| file_object.seek(0) |
| |
| end_loop = False |
| while True: |
| for file_object in file_objects: |
| for line in file_object: |
| m = match_line(line.rstrip(), pattern, filter_func) |
| if m is not None: |
| return m |
| |
| if end_loop: |
| break |
| |
| if timeout is not None and time.time() >= t_end: |
| raise subprocess.TimeoutExpired(None, timeout) |
| |
| # The wait function returns True to indicate that no more data will be |
| # produced (e.g. because the producing process terminated). But we still |
| # need to check one last time if the already produced data is matching |
| # the `pattern`. |
| end_loop = wait_func() |
| |
| return None |
| |
| |
| def match_line(line, pattern, filter_func=None): |
| """ |
| Check if a line matches a pattern |
| |
| Line endings (CR/LR) are removed from |line| and neither matched nor |
| returned. |
| |
| pattern can be of two types: |
| 1. A regular expression (any object with a match attribute, or a re.Pattern |
| object in Python 3.7+). In this case, the pattern is matched against all |
| lines and the result of re.match(pattern) (a re.Match object since |
| Python 3.7) is returned. Note that re.match() always matches from the |
| beginning of the line. |
| 2. A string. In this case pattern is compared to each line with |
| startswith(), and the full matching line is returned on a match. |
| If no match is found None is returned. |
| |
| Optionally apply a filter to the line before matching it. |
| """ |
| |
| if isinstance(line, bytes): |
| line = line.rstrip(b'\r\n') |
| else: |
| line = line.rstrip('\r\n') |
| |
| if filter_func is not None: |
| line = filter_func(line) |
| |
| # TODO: Check for an instance of re.Pattern once we move to Python 3.7+ |
| if hasattr(pattern, "match"): |
| return pattern.match(line) |
| else: |
| if line.startswith(pattern): |
| return line |
| |
| return None |
| |
| |
| def filter_remove_device_sw_log_prefix(line): |
| """ |
| Remove the file/line information in log messages produced by device software |
| LOG() macros. |
| """ |
| |
| # See base_log_internal_core() in lib/base/log.c for the format description. |
| pattern = r'^[IWEF?]\d{5} [a-zA-Z0-9\.-_]+:\d+\] ' |
| if isinstance(line, bytes): |
| return re.sub(bytes(pattern, encoding='utf-8'), b'', line) |
| else: |
| return re.sub(pattern, '', line) |
| |
| |
| def filter_remove_sw_test_status_log_prefix(line): |
| """ |
| Remove the logging prefix produced by the sw_test_status DV component. |
| """ |
| |
| # Example of a full prefix to be matched: |
| # 1629002: (../src/lowrisc_dv_sw_test_status_0/sw_test_status_if.sv:42) [TOP.chip_earlgrey_verilator.u_sw_test_status_if] |
| pattern = r'\d+: \(.+/sw_test_status_if\.sv:\d+\) \[TOP\..+\] ' |
| if isinstance(line, bytes): |
| return re.sub(bytes(pattern, encoding='utf-8'), b'', line) |
| else: |
| return re.sub(pattern, '', line) |