[test] Refactor system tests to use pytest as runner
System tests, living in `test/systemtest`, test the whole end-to-end
design of OpenTitan: they initialize the target (e.g. a simulation, an
FPGA, or later an ASIC), load test software as needed, and check for
outputs or other expected behavior. To do so, multiple components need
to work together. For example, a simulation process, components for data
transfer (such as OpenOCD), components which interact with (simulated or
real) external interfaces, etc. Pytest provides a very convenient way to
handle such situations through their fixtures. Our current system tests
make use of this functionality.
What we haven't been using in the past is the ability of pytest to serve
as test runner: select tests, collect their output and results, etc.
(Instead, a shell script was used.)
This PR refactors the system tests to make use of pytest as test runner,
with one main consequence: Tests are now "opinionated", they don't take
configuration parameters through command line arguments any more. Tests
expect build outputs in the distribution directory (`build-bin`) to be
in the well-known layout for this directory.
Notes:
* This commit lays the groundwork to add tests running on an FPGA target
in a similar way as the verilated tests, which should explain some of
the structuring of the code.
* The ability to run the debug "compliance test" built into OpenOCD,
which was present in the `test/systemtest/openocd_verilator_test.py`
file, has been removed temporarily. This test was unmaintained, and
will be brought back in revised form at a later time.
* Equally, the "FPGA test runner" available in
`test/systemtest/functional_fpga_test.py` has been rarely used (as a
poll on Slack showed) and is also removed.
It will be brought back in modified form in a future commit.
Signed-off-by: Philipp Wagner <phw@lowrisc.org>
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 3c611d3..79233ce 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -353,9 +353,9 @@
- template: ci/download-artifacts-template.yml
- bash: |
. util/build_consts.sh
- export VERILATED_SYSTEM_PATH="$BIN_DIR/hw/top_earlgrey/Vtop_earlgrey_verilator"
pytest --version
- ci/run_verilator_pytest.sh
+ pytest test/systemtest/earlgrey/test_sim_verilator.py \
+ --log-cli-level=DEBUG
displayName: Execute tests
- template: ci/run-riscv-compliance.yml
diff --git a/ci/run_verilator_pytest.sh b/ci/run_verilator_pytest.sh
deleted file mode 100755
index 4ca02cc..0000000
--- a/ci/run_verilator_pytest.sh
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/bin/bash
-# Copyright lowRISC contributors.
-# Licensed under the Apache License, Version 2.0, see LICENSE for details.
-# SPDX-License-Identifier: Apache-2.0
-set -e
-
-. util/build_consts.sh
-
-readonly VERILATED_SYSTEM_DEFAULT="build/lowrisc_systems_top_earlgrey_verilator_0.1/sim-verilator/Vtop_earlgrey_verilator"
-readonly SW_BUILD_DEFAULT="$DEV_BIN_DIR"
-
-VERILATED_SYSTEM_PATH="${VERILATED_SYSTEM_PATH:-$VERILATED_SYSTEM_DEFAULT}"
-SW_BUILD_PATH="${SW_BUILD_PATH:-$SW_BUILD_DEFAULT}"
-
-BOOT_ROM_TARGET="boot_rom/boot_rom_sim_verilator.elf"
-
-TEST_TARGETS=(
- "tests/aes_test_sim_verilator.elf"
- "tests/crt_test_sim_verilator.elf"
- "tests/dif_plic_sanitytest_sim_verilator.elf"
- "tests/dif_rv_timer_sanitytest_sim_verilator.elf"
- "tests/dif_uart_sanitytest_sim_verilator.elf"
- "tests/flash_ctrl_test_sim_verilator.elf"
- "tests/pmp_sanitytest_napot_sim_verilator.elf"
- "tests/pmp_sanitytest_tor_sim_verilator.elf"
- "tests/sha256_test_sim_verilator.elf"
- "tests/usbdev_test_sim_verilator.elf"
-)
-
-FAIL_TARGETS=()
-PASS_TARGETS=()
-for target in "${TEST_TARGETS[@]}"; do
- echo "Executing target ${target}"
- set +e
- set -x
- pytest -v -s test/systemtest/functional_verilator_test.py \
- --test_bin "$SW_BUILD_PATH/${target}" \
- --rom_bin "$SW_BUILD_PATH/${BOOT_ROM_TARGET}" \
- --verilator_model "$VERILATED_SYSTEM_PATH"
- if [[ $? == 0 ]]; then
- PASS_TARGETS=("${PASS_TARGETS[@]}" "${target}")
- else
- FAIL_TARGETS=("${FAIL_TARGETS[@]}" "${target}")
- fi
- set +x
- set -e
-done
-
-echo "Passing targets:"
-for target in "${PASS_TARGETS[@]}"; do
- echo "* ${target}"
-done
-
-if [ ${#FAIL_TARGETS[@]} -eq 0 ]; then
- echo "TESTS PASS!"
-else
- echo
- echo "Failing targets:"
- for target in "${FAIL_TARGETS[@]}"; do
- echo "* ${target}"
- done
- echo
- echo "TESTS FAILED!"
- exit 1
-fi
diff --git a/python-requirements.txt b/python-requirements.txt
index 99a9792..4e02949 100644
--- a/python-requirements.txt
+++ b/python-requirements.txt
@@ -19,6 +19,7 @@
premailer
pyelftools
pyftdi
+pyserial
pygments
pytest
pytest-timeout
diff --git a/test/.gitignore b/test/.gitignore
new file mode 100644
index 0000000..71971ce
--- /dev/null
+++ b/test/.gitignore
@@ -0,0 +1,2 @@
+.cache
+.pytest_cache
diff --git a/test/systemtest/.gitignore b/test/systemtest/.gitignore
deleted file mode 100644
index 16d3c4d..0000000
--- a/test/systemtest/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-.cache
diff --git a/test/systemtest/README.md b/test/systemtest/README.md
new file mode 100644
index 0000000..c3aaf16
--- /dev/null
+++ b/test/systemtest/README.md
@@ -0,0 +1,62 @@
+# OpenTitan System Tests
+
+System tests are end-to-end tests for the whole OpenTitan system. They operate
+on build outputs (typically in `build-bin`) and can be used to check that the
+full system, consisting of software, hardware, and tooling, works as expected.
+
+Tests can be executed against different target platforms (such as FPGAs or
+simulations), even though not all OpenTitan-supported targets are supported at
+the moment.
+
+## Available tests
+
+* `earlgrey/test_sim_verilator.py`: Run various software tests against a
+ Verilator-built simulation of the Earl Grey design.
+
+## Run the tests
+
+### Prerequisites
+
+Before the tests can be run, the various components of OpenTitan need to be
+built and present in the `BIN_DIR` directory (typically `$REPO_TOP/build-bin`).
+
+Alternatively, the tests can be executed against any pre-built OpenTitan
+snapshot available from GitHub releases, or from CI.
+
+To test build artifacts from CI follow these steps:
+
+* Go to an individual build job in the
+ ["CI" pipelines of Azure Pipelines](https://dev.azure.com/lowrisc/opentitan/_build?definitionId=9&_a=summary).
+ (The page where the individual steps for a single change are listed, not the
+ job overview page.)
+* In the information bar on the top click on a link labeled "5 published"
+ (or something similar). It will take you to a page titeled "Artifacts".
+* Expand the "opentitan-dist" artifact, and download the file
+ `opentitan-snapshot-xxx.tar.xz`.
+* Untar this file and set the `BIN_DIR` environment variable to point to the
+ extracted directory, for example:
+
+ ```sh
+ tar -xf opentitan-snapshot-20191101-1-2462-g3ad4fd1b.tar.xz
+ export BIN_DIR=$PWD/opentitan-snapshot-20191101-1-2462-g3ad4fd1b
+ ```
+
+### Test execution
+
+The pytest documentation gives a good overview of the available options to run
+tests in the [Usage and Invocations](https://docs.pytest.org/en/stable/usage.html)
+chapter. Here's a quick reference.
+
+```sh
+# Run all tests
+pytest test/systemtest
+
+# Run tests in a specific file
+pytest test/systemtest/earlgrey/test_sim_verilator.py
+
+# Run a specific test in a specific parametrization
+pytest test/systemtest/earlgrey/test_sim_verilator.py -k test_apps_selfchecking[usbdev_test]
+
+# Run tests with more verbose output
+pytest test/systemtest -sv --log-cli-level=DEBUG
+```
diff --git a/test/systemtest/__init__.py b/test/systemtest/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/systemtest/__init__.py
diff --git a/test/systemtest/config.py b/test/systemtest/config.py
new file mode 100644
index 0000000..8e24218
--- /dev/null
+++ b/test/systemtest/config.py
@@ -0,0 +1,17 @@
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+
+# Self-checking test applications, which return PASS or FAIL after completion.
+TEST_APPS_SELFCHECKING = [
+ "aes_test",
+ "crt_test",
+ "dif_plic_sanitytest",
+ "dif_rv_timer_sanitytest",
+ "dif_uart_sanitytest",
+ "flash_ctrl_test",
+ "pmp_sanitytest_napot",
+ "pmp_sanitytest_tor",
+ "sha256_test",
+ "usbdev_test",
+]
diff --git a/test/systemtest/conftest.py b/test/systemtest/conftest.py
index 117e3ed..2b6f2fe 100644
--- a/test/systemtest/conftest.py
+++ b/test/systemtest/conftest.py
@@ -4,24 +4,13 @@
import logging
import os
-import shutil
-import subprocess
from pathlib import Path
import pytest
-import yaml
+from . import utils
+log = logging.getLogger(__name__)
-def pytest_addoption(parser):
- """Test harness configuration options."""
- parser.addoption("--test_bin", action="store", default="")
- parser.addoption("--rom_bin", action="store", default="")
- parser.addoption("--verilator_model", action="store", default="")
- parser.addoption("--openocd", action="store", default="openocd")
- parser.addoption("--uart_timeout", action="store", default="60")
- parser.addoption("--fpga_uart", action="store", default="")
- parser.addoption("--spiflash", action="store", default="")
- parser.addoption("--log", action="store", default="")
@pytest.hookimpl(tryfirst=True)
def pytest_exception_interact(node, call, report):
@@ -29,45 +18,15 @@
try:
if not report.failed:
return
- if not 'tmp_path' in node.funcargs:
+ if 'tmp_path_factory' not in node.funcargs:
return
- except:
+ except Exception:
return
- tmp_path = str(node.funcargs['tmp_path'])
- logging.debug("================= DUMP OF ALL TEMPORARY FILES =================")
-
- for f in os.listdir(tmp_path):
- f_abs = os.path.join(tmp_path, f)
- if not os.path.isfile(f_abs):
- continue
- logging.debug("vvvvvvvvvvvvvvvvvvvv {} vvvvvvvvvvvvvvvvvvvv".format(f))
- with open(f_abs, 'r') as fp:
- for line in fp.readlines():
- logging.debug(line.rstrip())
- logging.debug("^^^^^^^^^^^^^^^^^^^^ {} ^^^^^^^^^^^^^^^^^^^^".format(f))
-
+ utils.dump_temp_files(node.funcargs['tmp_path_factory'].getbasetemp())
@pytest.fixture(scope="session")
-def localconf(request):
- """Host-local configuration."""
- if os.getenv('OPENTITAN_TEST_LOCALCONF') and os.path.isfile(
- os.environ['OPENTITAN_TEST_LOCALCONF']):
- localconf_yaml_file = os.environ['OPENTITAN_TEST_LOCALCONF']
- else:
- XDG_CONFIG_HOME = os.getenv(
- 'XDG_CONFIG_HOME', os.path.join(os.environ['HOME'], '.config'))
- localconf_yaml_file = os.path.join(XDG_CONFIG_HOME, 'opentitan',
- 'test-localconf.yaml')
- logging.getLogger(__name__).info('Reading configuration from ' +
- localconf_yaml_file)
-
- with open(str(localconf_yaml_file), 'r') as fp:
- return yaml.load(fp)
-
-
-@pytest.fixture(scope="session")
-def topsrcdir(request):
+def topsrcdir():
"""Return the top-level source directory as Path object."""
# TODO: Consider making this configurable using a pytest arg.
path = (Path(os.path.dirname(__file__)) / '..' / '..').resolve()
@@ -76,70 +35,14 @@
@pytest.fixture(scope="session")
-def sw_test_bin(pytestconfig):
- """Return path to software test binary."""
- path = Path(pytestconfig.getoption('test_bin')).resolve()
- assert path.is_file()
- return path
+def bin_dir(topsrcdir):
+ """ Return the BIN_DIR (build output directory) """
+ if 'BIN_DIR' in os.environ:
+ bin_dir = Path(os.environ['BIN_DIR'])
+ log.info("Using build outputs from environment variable BIN_DIR={}".format(str(bin_dir)))
+ else:
+ bin_dir = topsrcdir / 'build-bin'
+ log.info("Using build outputs from $REPO_TOP/build-bin ({})".format(str(bin_dir)))
-@pytest.fixture(scope="session")
-def rom_bin(pytestconfig):
- """Return path to boot_rom binary."""
- path = Path(pytestconfig.getoption('rom_bin')).resolve()
- assert path.is_file()
- return path
-
-
-@pytest.fixture(scope="session")
-def sim_top_build(pytestconfig):
- """Return path to Verilator sim model."""
- path = Path(pytestconfig.getoption('verilator_model')).resolve()
- assert path.is_file()
- return path
-
-
-@pytest.fixture(scope="session")
-def openocd(pytestconfig):
- """Return path to OpenOCD executable."""
- path = Path(pytestconfig.getoption('openocd'))
- # TODO: Require that the OpenOCD executable be passed as a command-line
- # argument in the future, rather than relying on $PATH lookup.
- # assert path.is_file()
- return path
-
-@pytest.fixture(scope="session")
-def uart_timeout(pytestconfig):
- """Return the timeout in seconds for UART to print PASS."""
- return int(pytestconfig.getoption('uart_timeout'))
-
-@pytest.fixture(scope="session")
-def fpga_uart(pytestconfig):
- """Return the path to the UART attached to the FPGA."""
- path = Path(pytestconfig.getoption('fpga_uart')).resolve()
- assert path.exists() and not path.is_dir()
- return path
-
-@pytest.fixture(scope="session")
-def spiflash(pytestconfig):
- """Return the path to the spiflash executable."""
- path = Path(pytestconfig.getoption('spiflash')).resolve()
- assert path.is_file()
- return path
-
-@pytest.fixture(scope="session")
-def logfile(pytestconfig):
- """Return path to logfile."""
- log = pytestconfig.getoption('log')
- if not log:
- return
- # The strict option is only availabe on python 3.6
- # CI currently uses >=3.5.2
- # Turns out however even not using resolve doesn't work.
- # The logging function in 3.5 uses os.path.isabs to check whether
- # path is absolute and does not accept POSIXPATH objects
- # path = Path(log).resolve(strict=False)
- # assert not path.is_dir()
-
- path = os.path.abspath(log)
- assert not os.path.isdir(path)
- return path
+ assert bin_dir.is_dir()
+ return bin_dir
diff --git a/test/systemtest/earlgrey/__init__.py b/test/systemtest/earlgrey/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/systemtest/earlgrey/__init__.py
diff --git a/test/systemtest/earlgrey/test_sim_verilator.py b/test/systemtest/earlgrey/test_sim_verilator.py
new file mode 100644
index 0000000..f357e9a
--- /dev/null
+++ b/test/systemtest/earlgrey/test_sim_verilator.py
@@ -0,0 +1,240 @@
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import re
+from pathlib import Path
+
+import pytest
+
+from .. import config, utils
+
+log = logging.getLogger(__name__)
+
+
+class VerilatorSimEarlgrey:
+ UART0_SPEED = 9600 # see device/lib/arch/device_sim_verilator.c
+
+ def __init__(self, sim_path: Path, rom_elf_path: Path, work_dir: Path):
+ """ A verilator simulation of the Earl Grey toplevel """
+ assert sim_path.is_file()
+ self._sim_path = sim_path
+
+ assert rom_elf_path.is_file()
+ self._rom_elf_path = rom_elf_path
+
+ assert work_dir.is_dir()
+ self._work_dir = work_dir
+
+ self._log = logging.getLogger(__name__)
+
+ self._uart0_log = None
+ self.uart0_log_path = None
+ self.spi0_log_path = None
+
+ def run(self, flash_elf=None):
+ """
+ Run the simulation
+ """
+
+ self.uart0_log_path = self._work_dir / 'uart0.log'
+
+ cmd_sim = [
+ self._sim_path, '--meminit=rom,' + str(self._rom_elf_path),
+ '+UARTDPI_LOG_uart0=' + str(self.uart0_log_path)
+ ]
+
+ if flash_elf is not None:
+ assert flash_elf.is_file()
+ cmd_sim.append('--meminit=flash,' + str(flash_elf))
+
+ self.p_sim = utils.Process(cmd_sim,
+ logdir=self._work_dir,
+ cwd=self._work_dir,
+ startup_done_expect='Simulation running',
+ startup_timeout=10)
+ self.p_sim.run()
+
+ self._log.info("Simulation running")
+
+ # Find paths to simulated I/O devices
+ # UART
+ self._uart0 = None
+ uart0_match = self.p_sim.find_in_output(
+ re.compile(r'UART: Created (/dev/pts/\d+) for uart0\.'),
+ timeout=1,
+ from_start=True)
+ assert uart0_match is not None
+ self.uart0_device_path = Path(uart0_match.group(1))
+ assert self.uart0_device_path.is_char_device()
+ self._log.debug("Found uart0 device file at {}".format(
+ str(self.uart0_device_path)))
+
+ assert self.uart0_log_path.is_file()
+ self._uart0_log = open(str(self.uart0_log_path), 'rb')
+
+ # SPI
+ spi0_match = self.p_sim.find_in_output(
+ re.compile(r'SPI: Created (/dev/pts/\d+) for spi0\.'),
+ timeout=1,
+ from_start=True)
+ assert spi0_match is not None
+ self.spi0_device_path = Path(spi0_match.group(1))
+ assert self.spi0_device_path.is_char_device()
+ self._log.debug("Found spi0 device file at {}".format(
+ str(self.spi0_device_path)))
+
+ self.spi0_log_path = self._work_dir / 'spi0.log'
+ assert self.spi0_log_path.is_file()
+
+ # GPIO
+ self.gpio0_fifo_write_path = self._work_dir / 'gpio0-write'
+ assert self.gpio0_fifo_write_path.is_fifo()
+
+ self.gpio0_fifo_read_path = self._work_dir / 'gpio0-read'
+ assert self.gpio0_fifo_read_path.is_fifo()
+
+ self._log.info("Simulation startup completed.")
+
+ def uart0(self):
+ if self._uart0 is None:
+ log_dir_path = self._work_dir / 'uart0'
+ log_dir_path.mkdir()
+ log.info("Opening UART on device {} ({} baud)".format(
+ str(self.uart0_device_path), self.UART0_SPEED))
+ self._uart0 = utils.LoggingSerial(
+ str(self.uart0_device_path),
+ self.UART0_SPEED,
+ timeout=1,
+ log_dir_path=log_dir_path,
+ default_filter_func=utils.filter_remove_device_sw_log_prefix)
+
+ return self._uart0
+
+ def terminate(self):
+ """ Gracefully terminate the simulation """
+ if self._uart0 is not None:
+ self._uart0.close()
+
+ if self._uart0_log is not None:
+ self._uart0_log.close()
+
+ self.p_sim.send_ctrl_c()
+
+ # Give the process some time to clean up
+ self.p_sim.proc.wait(timeout=5)
+ assert self.p_sim.proc.returncode == 0
+
+ try:
+ self.p_sim.terminate()
+ except ProcessLookupError:
+ # process is already dead
+ pass
+
+ def find_in_uart0(self,
+ pattern,
+ timeout,
+ filter_func=None,
+ from_start=False):
+ assert self._uart0_log
+
+ if filter_func is None:
+ # The default filter function for all device software in OpenTitan.
+ filter_func = utils.filter_remove_device_sw_log_prefix
+
+ return utils.find_in_files([self._uart0_log],
+ pattern,
+ timeout,
+ filter_func=filter_func,
+ from_start=from_start)
+
+
+@pytest.fixture(params=config.TEST_APPS_SELFCHECKING)
+def app_selfchecking_elf(request, bin_dir):
+ """ A self-checking device application as ELF for Verilator simulation """
+
+ test_filename = request.param + '_sim_verilator.elf'
+ bin_path = bin_dir / 'sw/device/tests' / test_filename
+ assert bin_path.is_file()
+ return bin_path
+
+
+# The following tests use the UART output from the log file written by the
+# UARTDPI module, and not the simulated UART device (/dev/pty/N) to ensure
+# reliable testing: As soon as the device application finishes, the simulation
+# ends and the UART device becomes unavailable to readers.
+# Therefore, if we do not read quickly enough, we miss the PASS/FAIL indication
+# and the test never finishes.
+
+
+def test_apps_selfchecking(tmp_path, bin_dir, app_selfchecking_elf):
+ """
+ Run a self-checking application on a Earl Grey Verilator simulation
+
+ The ROM is initialized with the default boot ROM, the flash is initialized
+ with |app_selfchecking_elf|.
+
+ Self-checking applications are expected to return PASS or FAIL in the end.
+ """
+
+ sim_path = bin_dir / "hw/top_earlgrey/Vtop_earlgrey_verilator"
+ rom_elf_path = bin_dir / "sw/device/boot_rom/boot_rom_sim_verilator.elf"
+
+ sim = VerilatorSimEarlgrey(sim_path, rom_elf_path, tmp_path)
+
+ sim.run(app_selfchecking_elf)
+
+ bootmsg_exp = b'Boot ROM initialisation has completed, jump into flash!'
+ assert sim.find_in_uart0(
+ bootmsg_exp,
+ timeout=120) is not None, "End-of-bootrom string not found."
+
+ log.debug("Waiting for pass string from device test")
+
+ result_match = sim.find_in_uart0(re.compile(rb'^(PASS|FAIL)!$'),
+ timeout=240)
+ assert result_match is not None, "PASS/FAIL indication not found in test output."
+
+ result_msg = result_match.group(1)
+ log.info("Test ended with {}".format(result_msg))
+ assert result_msg == b'PASS'
+
+ sim.terminate()
+
+
+def test_spiflash(tmp_path, bin_dir):
+ """ Load a single application to the Verilator simulation using spiflash """
+
+ sim_path = bin_dir / "hw/top_earlgrey/Vtop_earlgrey_verilator"
+ rom_elf_path = bin_dir / "sw/device/boot_rom/boot_rom_sim_verilator.elf"
+
+ sim = VerilatorSimEarlgrey(sim_path, rom_elf_path, tmp_path)
+ sim.run()
+
+ log.debug("Waiting for simulation to be ready for SPI input")
+ spiwait_msg = b'HW initialisation completed, waiting for SPI input...'
+ assert sim.find_in_uart0(spiwait_msg, timeout=120)
+
+ log.debug("SPI is ready, continuing with spiload")
+ app_bin = bin_dir / 'sw/device/tests/dif_uart_sanitytest_sim_verilator.bin'
+ spiflash = bin_dir / 'sw/host/spiflash/spiflash'
+ utils.load_sw_over_spi(tmp_path, spiflash, app_bin,
+ ['--verilator', sim.spi0_device_path])
+
+ bootmsg_exp = b'Boot ROM initialisation has completed, jump into flash!'
+ assert sim.find_in_uart0(
+ bootmsg_exp,
+ timeout=600) is not None, "End-of-bootrom string not found."
+
+ log.debug("Waiting for pass string from device test")
+
+ result_match = sim.find_in_uart0(re.compile(rb'^(PASS|FAIL)!$'),
+ timeout=240)
+ assert result_match is not None, "PASS/FAIL indication not found in test output."
+
+ result_msg = result_match.group(1)
+ log.info("Test ended with {}".format(result_msg))
+ assert result_msg == b'PASS'
+
+ sim.terminate()
diff --git a/test/systemtest/functional_fpga_test.py b/test/systemtest/functional_fpga_test.py
deleted file mode 100644
index 4de7377..0000000
--- a/test/systemtest/functional_fpga_test.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright lowRISC contributors.
-# Licensed under the Apache License, Version 2.0, see LICENSE for details.
-# SPDX-License-Identifier: Apache-2.0
-r"""Runs a binary on an FPGA, which is expected to write
-one of "PASS!\r\n" or "FAIL!\r\n" to the UART to determine success or failure.
-Failing to write either will result in a timeout.
-
-This test requires some setup and some configuration. This test expects you
-to have:
- - A supported FPGA connected to your workstation.
- - That FPGA must be flashed with a synthesized EarlGrey bitfile, which has
- been spliced with the OpenTitan bootloader.
- - A prebuilt spiflash executable.
-
-You must then provide this test with the .bin file for the test, the UART
-device path for the FPGA, and the spiflash executable. For example:
-
-$ cd ${REPO_TOP}
-$ pytest -s -v test/systemtest/functional_fpga_test.py \
- --test_bin sw/device/tests/hmac/sw.bin \
- --fpga_uart /dev/ttyUSB2 \
- --spiflash sw/host/spiflash/spiflash
-"""
-
-import logging
-from pathlib import Path
-import re
-
-import pytest
-
-import test_utils
-
-logging.basicConfig(level=logging.DEBUG)
-
-
-class TestFunctionalFpga:
- """
- Execute a test binary on a locally connected FPGA, using UART
- output to validate test success or failure.
- """
- @pytest.fixture
- def spiflash_proc(self, tmp_path, spiflash, sw_test_bin):
- cmd_flash = [str(spiflash), '--input', str(sw_test_bin)]
- p_flash = test_utils.Process(cmd_flash,
- logdir=str(tmp_path),
- cwd=str(tmp_path),
- startup_done_expect='Running SPI flash update.',
- startup_timeout=10)
- p_flash.run()
-
- yield p_flash
-
- p_flash.terminate()
-
- def test_execute_binary(self, spiflash_proc, fpga_uart, uart_timeout, logfile):
- """
- Executes the binary and inspects its UART for "PASS!\r\n" or "FAIL!\r\n".
- """
-
- logger = logging.getLogger(__name__)
- test_utils.setup_logfile(logger, logfile)
-
-
- # Open the UART device and read line by line until we pass or fail.
- with fpga_uart.open('rb') as uart_device:
- uart_fd = uart_device.fileno()
- pattern = re.compile('.*?(PASS!\r\n|FAIL!\r\n)')
- match = test_utils.stream_fd_to_log(uart_fd, logger, pattern,
- uart_timeout)
-
- if match == None:
- pytest.fail('Deadline exceeded: did not see PASS! or FAIL! within %ds.', uart_timeout)
-
- if match.group(1) == 'PASS!\r\n':
- logger.debug('Got PASS! from binary.')
- else:
- pytest.fail('Got FAIL! from binary.')
diff --git a/test/systemtest/functional_verilator_test.py b/test/systemtest/functional_verilator_test.py
deleted file mode 100644
index 4ae54d5..0000000
--- a/test/systemtest/functional_verilator_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright lowRISC contributors.
-# Licensed under the Apache License, Version 2.0, see LICENSE for details.
-# SPDX-License-Identifier: Apache-2.0
-r"""Runs a binary on the verilated system, which is expected to write
-one of "PASS!\r\n" or "FAIL!\r\n" to UART to determine success or failure.
-Failing to write either will result in a timeout.
-
-This test requires some configuration options. Use the following steps to run
-the test manually after building a Verilator simulation and the device boot ROM
-and a device software target.
-
-$ cd ${REPO_TOP}
-$ pytest -s -v test/systemtest/functional_verilator_test.py \
- --test_bin sw.elf \
- --rom_bin boot_rom.elf \
- --verilator_model build/lowrisc_systems_top_earlgrey_verilator_0.1/sim-verilator/Vtop_earlgrey_verilator
-"""
-
-import logging
-import re
-import pytest
-
-import test_utils
-
-logging.basicConfig(level=logging.DEBUG)
-
-class TestFunctionalVerilator:
- """
- Execute a test binary in a Verilator-simulated hardware build, using UART
- output to validate test success or failure.
- """
- @pytest.fixture
- def sim_top_earlgrey(self, tmp_path, sim_top_build, sw_test_bin, rom_bin):
- cmd_sim = [
- str(sim_top_build),
- '--meminit=flash,' + str(sw_test_bin),
- '--meminit=rom,' + str(rom_bin)
- ]
- p_sim = test_utils.Process(cmd_sim,
- logdir=str(tmp_path),
- cwd=str(tmp_path),
- startup_done_expect='Simulation running',
- startup_timeout=10)
- p_sim.run()
-
- yield p_sim
-
- p_sim.terminate()
-
- @pytest.mark.timeout(240)
- def test_execute_binary(self, sim_top_earlgrey, uart_timeout, logfile):
- """
- Executes the binary and inspects its UART for "PASS!\r\n" or "FAIL!\r\n".
- """
-
- logger = logging.getLogger(__name__)
- test_utils.setup_logfile(logger, logfile)
-
-
- # Verilator will print the string "UART: created /dev/pts/#" to
- # indicate which pseudoterminal the UART port is bound to.
- uart_match = sim_top_earlgrey.find_in_output(
- re.compile('UART: Created (/dev/pts/\\d+)'), 5)
-
- assert uart_match is not None
- uart_path = uart_match.group(1)
- logger.info("Found UART port at %s." % uart_path)
-
- # Now, open the UART device and read line by line until we pass or
- # fail.
- with open(uart_path, 'rb') as uart_device:
- uart_fd = uart_device.fileno()
- pattern = re.compile('.*?(PASS!\r\n|FAIL!\r\n)')
- match = test_utils.stream_fd_to_log(uart_fd, logger, pattern,
- uart_timeout)
- assert match is not None, ('Deadline exceeded: did not see PASS! or FAIL! within %ds.' % uart_timeout)
-
- assert match.group(1).strip() == 'PASS!'
diff --git a/test/systemtest/openocd_verilator_test.py b/test/systemtest/openocd_verilator_test.py
deleted file mode 100644
index a4e89fe..0000000
--- a/test/systemtest/openocd_verilator_test.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Copyright lowRISC contributors.
-# Licensed under the Apache License, Version 2.0, see LICENSE for details.
-# SPDX-License-Identifier: Apache-2.0
-r"""Runs OpenOCD compliance test against Verilator target.
-
-This test requires some configuration options. Use the following steps to run
-the test manually after building Verilator and the boot_rom and hello_world
-targets.
-
-$ cd ${REPO_TOP}
-$ pytest -s -v test/systemtest/openocd_verilator_test.py \
- --test_bin hello_world.elf \
- --rom_bin boot_rom.elf \
- --verilator_model build/lowrisc_systems_top_earlgrey_verilator_0.1/sim-verilator/Vtop_earlgrey_verilator
-
-In some cases the pytest environment may not be able to find the openocd binary.
-To work around this issue, run the test with an additional configuration option:
-
- --openocd /tools/openocd/bin/openocd
-"""
-
-import logging
-import re
-import subprocess
-import time
-import pytest
-
-import test_utils
-
-logging.basicConfig(level=logging.DEBUG)
-
-class TestCoreVerilator:
- """Test core functionality in a Verilator-simulated hardware build."""
-
- @pytest.fixture
- def sim_top_earlgrey(self, tmp_path, sim_top_build, sw_test_bin, rom_bin):
- cmd_sim = [
- str(sim_top_build),
- '--meminit=flash,' + str(sw_test_bin),
- '--meminit=rom,' + str(rom_bin)
- ]
- p_sim = test_utils.Process(
- cmd_sim,
- logdir=str(tmp_path),
- cwd=str(tmp_path),
- startup_done_expect='Simulation running',
- startup_timeout=10)
- p_sim.run()
-
- yield p_sim
-
- p_sim.terminate()
-
- def test_openocd_riscv_compliancetest(self, tmp_path, sim_top_earlgrey,
- topsrcdir, openocd):
- """Run RISC-V Debug compliance test built into OpenOCD."""
- assert subprocess.call([str(openocd), '--version']) == 0
- cmd_openocd = [
- str(openocd),
- '-s', str(topsrcdir / 'util' / 'openocd'),
- '-f', 'board/lowrisc-earlgrey-verilator.cfg',
- '-c', 'init; riscv test_compliance; shutdown'
- ]
- p_openocd = test_utils.Process(
- cmd_openocd, logdir=str(tmp_path), cwd=str(tmp_path))
- p_openocd.run()
-
- logging.getLogger(__name__).info(
- "OpenOCD should terminate itself; give it up to 5 minutes")
- p_openocd.proc.wait(timeout=300)
- assert p_openocd.proc.returncode == 0
-
- logging.getLogger(__name__).info(
- "Now wait 60 seconds until the program has finished execution")
- time.sleep(60)
-
- try:
- p_openocd.terminate()
- except ProcessLookupError:
- # process is already dead
- pass
diff --git a/test/systemtest/test_utils.py b/test/systemtest/test_utils.py
deleted file mode 100644
index 2868619..0000000
--- a/test/systemtest/test_utils.py
+++ /dev/null
@@ -1,274 +0,0 @@
-# Copyright lowRISC contributors.
-# Licensed under the Apache License, Version 2.0, see LICENSE for details.
-# SPDX-License-Identifier: Apache-2.0
-
-import difflib
-import filecmp
-import inspect
-import logging
-import os
-import re
-import select
-import shlex
-import signal
-import subprocess
-import time
-
-
-class Process:
- """Utility class used to spawn an interact with processs.s"""
- def __init__(self,
- cmd,
- logdir,
- cwd=None,
- startup_done_expect=None,
- startup_timeout=None):
- """Creates Process instance.
-
- Args:
- cmd: Command line argument used to spawn process.
- logdir: Directory used to store STDOUT and STDERR output.
- cwd: Working directory used to spawn process.
- startup_done_expect: Pattern used to check at process startup time.
- startup_timeout: Timeout in seconds for |startup_done_expect| check.
- """
- if isinstance(cmd, str):
- self.cmd = shlex.split(cmd)
- else:
- self.cmd = cmd
- self.logdir = logdir
- self.cwd = cwd
- self.startup_done_expect = startup_done_expect
- self.startup_timeout = startup_timeout
- self.proc = None
- self.logger = logging.getLogger(__name__)
-
- self._f_stdout = None
- self._f_stderr = None
- self._f_stdout_r = None
- self._f_stderr_r = None
-
- def __del__(self):
- try:
- self.proc.kill()
- self._f_stdout.close()
- self._f_stderr.close()
- self._f_stdout_r.close()
- self._f_stderr_r.close()
- except:
- pass
-
- def run(self):
- """Start process with command line configured in constructor."""
- cmd_name = os.path.basename(self.cmd[0])
-
- # Enforce line-buffered STDOUT even when sending STDOUT/STDERR to file.
- # If applications don't fflush() STDOUT manually, STDOUT goes through
- # a 4kB buffer before we see any output, which prevents searching for
- # the string indicating a successful startup.
- # see discussion at http://www.pixelbeat.org/programming/stdio_buffering/
- cmd = ['stdbuf', '-oL'] + self.cmd
- self.logger.info("Running command " + ' '.join(cmd))
-
- logfile_stdout = os.path.join(self.logdir,
- "{}.stdout".format(cmd_name))
- logfile_stderr = os.path.join(self.logdir,
- "{}.stderr".format(cmd_name))
- self.logger.debug("Capturing STDOUT at " + logfile_stdout)
- self.logger.debug("Capturing STDERR at " + logfile_stderr)
-
- self._f_stdout = open(logfile_stdout, 'w')
- self._f_stderr = open(logfile_stderr, 'w')
- self.proc = subprocess.Popen(cmd,
- cwd=self.cwd,
- universal_newlines=True,
- bufsize=1,
- stdin=subprocess.PIPE,
- stdout=self._f_stdout,
- stderr=self._f_stderr)
-
- self._f_stdout_r = open(logfile_stdout, 'r')
- self._f_stderr_r = open(logfile_stderr, 'r')
-
- # no startup match pattern given => startup done!
- if self.startup_done_expect == None:
- return True
-
- # check if the string indicating a successful startup appears in the
- # the program output (STDOUT or STDERR)
- init_done = self.find_in_output(pattern=self.startup_done_expect,
- timeout=self.startup_timeout)
-
- if init_done == None:
- raise subprocess.TimeoutExpired
-
- self.logger.info("Startup sequence matched, startup done.")
-
- return True
-
- def terminate(self):
- """Terminates process started by run call."""
- if not self.proc:
- return
- self.proc.terminate()
-
- def send_ctrl_c(self):
- """Sends SIGINT to process started by run call."""
- if not self.proc:
- return
- self.proc.send_signal(signal.SIGINT)
-
- def expect(self, stdin_data=None, pattern=None, timeout=None):
- """Write send to STDIN and check if the output is as expected.
-
- Args:
- stdin_data: Data to send to STDIN.
- pattern: Pattern to search for after sending |stdin_data|.
- timeout: Timeout in seconds for |pattern| check.
- Returns:
- True if |pattern| found, False otherwise.
- """
- # We don't get STDOUT/STDERR from subprocess.communicate() as it's
- # redirected to file. We need to read the files instead.
-
- # XXX: races (false positives) can happen here if output is generated
- # before the input is sent to the process.
- if pattern == None:
- self._f_stdout_r.seek(0, 2)
-
- self.proc.stdin.write(stdin_data)
- self.proc.stdin.flush()
-
- if pattern == None:
- return True
-
- return self.find_in_output(pattern, timeout) != None
-
- def find_in_output(self, pattern, timeout):
- """Read STDOUT and STDERR to find an expected pattern.
-
- Both streams are reset to the start of the stream before searching.
- pattern can be of two types:
- 1. A regular expression (a re object). In this case, the pattern is
- matched against all lines and the result of re.match(pattern) is
- returned. Multi-line matches are not supported.
- 2. A string. In this case pattern is compared to each line with
- startswith(), and the full matching line is returned on a match.
- If no match is found None is returned.
- timeout is given in seconds.
-
- Args:
- pattern: Pattern to search for in STDOUT or STDERR.
- timeout: Timeout in seconds for |pattern| check.
- Returns:
- String containing |pattern|, None otherwise.
- """
-
- if timeout != None:
- t_end = time.time() + timeout
-
- # reset streams
- self._f_stdout_r.seek(0)
- self._f_stderr_r.seek(0)
-
- while True:
- # check STDOUT as long as there is one
- i = 0
- for line in self._f_stdout_r:
- i += 1
- if hasattr(pattern, "match"):
- m = pattern.match(line)
- if m:
- return m
- else:
- if line.startswith(pattern):
- return line
-
- # Check if we exceed the timeout while reading from STDOUT
- # do so only every 100 lines to reduce the performance impact.
- if timeout != None and i % 100 == 99 and time.time() >= t_end:
- break
-
- # check STDERR as long as there is one
- i = 0
- for line in self._f_stderr_r:
- i += 1
- if hasattr(pattern, "match"):
- m = pattern.match(line)
- if m:
- return m
- else:
- if line.startswith(pattern):
- return line
-
- # Check if we exceed the timeout while reading from STDERR
- # do so only every 100 lines to reduce the performance impact.
- if timeout != None and i % 100 == 99 and time.time() >= t_end:
- break
-
- # wait for 200ms for new output
- if timeout != None:
- try:
- self.proc.wait(timeout=0.2)
- except subprocess.TimeoutExpired:
- pass
-
- return None
-
-
-def stream_fd_to_log(fd, logger, pattern, timeout=None):
- """
- Streams lines from the given fd to log until pattern matches.
-
- Returns the match object derived from pattern.match(), or None if
- the timeout expires.
- """
-
- deadline = None
- if timeout != None:
- deadline = time.monotonic() + timeout
-
- os.set_blocking(fd, False)
- line_of_output = b''
- while True:
- curtime = time.monotonic()
- if deadline != None and curtime > deadline:
- return None
-
- if line_of_output.endswith(b'\n'):
- line_of_output = b''
-
- # select() on the fd so that we don't waste time reading when
- # we wouldn't get anything out of it.
- if deadline != None:
- rlist, _, _ = select.select([fd], [], [],
- deadline - curtime)
- else:
- rlist, _, _ = select.select([fd], [], [])
-
- if len(rlist) == 0:
- continue
-
- raw_bytes = os.read(fd, 1024)
- lines = raw_bytes.splitlines(True)
-
- for line in lines:
- line_of_output += line
- if not line_of_output.endswith(b'\n'):
- break
-
- logger.debug('fd#%d: %s' % (fd, line_of_output))
- match = pattern.match(line_of_output.decode('utf-8'))
- if match != None:
- return match
-
- line_of_output = b''
-
-# If logfile option was given, log all outputs to file.
-def setup_logfile(logger, logfile):
- if logfile:
- logger.debug("Logfile at %s" % (logfile))
- logger.setLevel(logging.DEBUG)
- fh = logging.FileHandler(filename=logfile, mode='w')
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
diff --git a/test/systemtest/utils.py b/test/systemtest/utils.py
new file mode 100644
index 0000000..cf15c43
--- /dev/null
+++ b/test/systemtest/utils.py
@@ -0,0 +1,452 @@
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+
+import locale
+import logging
+import os
+import re
+import shlex
+import signal
+import subprocess
+import time
+from pathlib import Path
+
+import serial
+
+log = logging.getLogger(__name__)
+
+
+class Process:
+ def __init__(self,
+ cmd,
+ logdir,
+ cwd=None,
+ startup_done_expect=None,
+ startup_timeout=None,
+ default_filter_func=None):
+ """Utility class used to spawn an interact with processes.
+
+ Args:
+ cmd: Command line argument used to spawn process.
+ logdir: Directory used to store STDOUT and STDERR output.
+ cwd: Working directory used to spawn process.
+ startup_done_expect: Pattern used to check at process startup time.
+ startup_timeout: Timeout in seconds for |startup_done_expect| check.
+ default_filter_func: Default function to filter all stdout/stderr
+ output with when matching it with find_in_output().
+ """
+ if isinstance(cmd, str):
+ self.cmd = shlex.split(cmd)
+ else:
+ self.cmd = [str(c) for c in cmd]
+ self.logdir = str(logdir)
+ self.cwd = str(cwd)
+ self.startup_done_expect = startup_done_expect
+ self.startup_timeout = startup_timeout
+ self.proc = None
+ self.logger = logging.getLogger(__name__)
+
+ self._f_stdout = None
+ self._f_stderr = None
+ self._f_stdout_r = None
+ self._f_stderr_r = None
+
+ self.default_filter_func = default_filter_func
+
+ def __del__(self):
+ try:
+ self.proc.kill()
+ self._f_stdout.close()
+ self._f_stderr.close()
+ self._f_stdout_r.close()
+ self._f_stderr_r.close()
+ except Exception:
+ pass
+
+ def is_running(self):
+ """ Check if the process is running
+
+ Returns:
+ True if the process is running, False otherwise
+ """
+ return self.proc and self.proc.poll() is not None
+
+ def run(self):
+ """Start process with command line configured in constructor."""
+ cmd_name = os.path.basename(self.cmd[0])
+
+ # Enforce line-buffered STDOUT even when sending STDOUT/STDERR to file.
+ # If applications don't fflush() STDOUT manually, STDOUT goes through
+ # a 4kB buffer before we see any output, which prevents searching for
+ # the string indicating a successful startup.
+ # see discussion at http://www.pixelbeat.org/programming/stdio_buffering/
+ cmd = ['stdbuf', '-oL'] + self.cmd
+ self.logger.info("Running command " + ' '.join(cmd))
+
+ logfile_stdout = os.path.join(self.logdir,
+ "{}.stdout".format(cmd_name))
+ logfile_stderr = os.path.join(self.logdir,
+ "{}.stderr".format(cmd_name))
+ self.logger.debug("Capturing STDOUT at " + logfile_stdout)
+ self.logger.debug("Capturing STDERR at " + logfile_stderr)
+
+ self._f_stdout = open(logfile_stdout, 'w')
+ self._f_stderr = open(logfile_stderr, 'w')
+ self.proc = subprocess.Popen(cmd,
+ cwd=self.cwd,
+ universal_newlines=True,
+ bufsize=1, # Use line buffering
+ stdin=subprocess.PIPE,
+ stdout=self._f_stdout,
+ stderr=self._f_stderr)
+
+ self._f_stdout_r = open(logfile_stdout, 'r')
+ self._f_stderr_r = open(logfile_stderr, 'r')
+
+ # no startup match pattern given => startup done!
+ if self.startup_done_expect is None:
+ return True
+
+ # check if the string indicating a successful startup appears in the
+ # the program output (STDOUT or STDERR)
+ init_done = self.find_in_output(pattern=self.startup_done_expect,
+ timeout=self.startup_timeout)
+
+ if init_done is None:
+ raise subprocess.TimeoutExpired
+
+ self.logger.info("Startup sequence matched, startup done.")
+
+ return True
+
+ def terminate(self):
+ """Terminates process started by run call."""
+ if self.proc is not None:
+ self.proc.terminate()
+
+ def send_ctrl_c(self):
+ """Sends SIGINT to process started by run call."""
+ if self.proc is not None:
+ self.proc.send_signal(signal.SIGINT)
+
+ def expect(self, stdin_data=None, pattern=None, timeout=None):
+ """Write data to STDIN and check if the output is as expected.
+
+ Args:
+ stdin_data: Data to send to STDIN.
+ pattern: Pattern to search for after sending |stdin_data|.
+ timeout: Timeout in seconds for |pattern| check.
+ Returns:
+ True if |pattern| found, False otherwise.
+ """
+ # We don't get STDOUT/STDERR from subprocess.communicate() as it's
+ # redirected to file. We need to read the files instead.
+
+ # XXX: races (false positives) can happen here if output is generated
+ # before the input is sent to the process.
+ if pattern is None:
+ self._f_stdout_r.seek(0, 2)
+
+ self.proc.stdin.write(stdin_data)
+ self.proc.stdin.flush()
+
+ if pattern is None:
+ return True
+
+ return self.find_in_output(pattern, timeout) is not None
+
+ def find_in_output(self,
+ pattern,
+ timeout,
+ from_start=False,
+ filter_func=None):
+ """Read STDOUT and STDERR to find an expected pattern.
+
+ See find_in_files() for more documentation.
+ """
+
+ if filter_func is None:
+ filter_func = self.default_filter_func
+
+ def wait_func():
+ """ Wait up to 0.2s until the process terminates.
+
+ Returns:
+ True if the subprocess terminated and a further wait will not
+ produce more output, False otherwise.
+ """
+ try:
+ self.proc.wait(timeout=0.2)
+ except subprocess.TimeoutExpired:
+ # The process did not yet terminate.
+ return False
+
+ # The process did terminate.
+ return True
+
+ return find_in_files([self._f_stdout_r, self._f_stderr_r],
+ pattern,
+ timeout,
+ filter_func=filter_func,
+ from_start=from_start,
+ wait_func=wait_func)
+
+
+class LoggingSerial(serial.Serial):
+ """ Acess to a serial console which logs all read/written data to file. """
+ def __init__(self,
+ *args,
+ log_dir_path,
+ default_filter_func=None,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._log_to_device_fp = open(str(log_dir_path / 'to-device.log'),
+ 'wb')
+ self._log_from_device_fp = open(str(log_dir_path / 'from-device.log'),
+ 'wb')
+
+ self.default_filter_func = default_filter_func
+
+ log.debug("Logging UART communication for {} to {}".format(
+ self.port, str(log_dir_path)))
+
+ def read(self, size=1):
+ bytes = super().read(size)
+ self._log_from_device_fp.write(bytes)
+
+ # Explicitly flush the log file to ensure that data is present in the
+ # log file when the conftest log dumping routines read the file after a
+ # failed test.
+ self._log_from_device_fp.flush()
+
+ return bytes
+
+ def write(self, data):
+ retcode = super().write(data)
+ self._log_to_device_fp.write(data)
+
+ # Explicitly flush the log file to ensure that data is present in the
+ # log file when the conftest log dumping routines read the file after a
+ # failed test.
+ self._log_to_device_fp.flush()
+
+ return retcode
+
+ def close(self):
+ if self._log_to_device_fp:
+ self._log_to_device_fp.close()
+
+ if self._log_from_device_fp:
+ self._log_from_device_fp.close()
+
+ super().close()
+
+ def find_in_output(self, pattern, timeout=None, filter_func=None):
+ """ Expect a pattern to appear in one of the output lines.
+
+ See the documentation of match_line() for a description of |pattern|.
+
+ Args:
+ pattern: Pattern to search for
+ timeout: Timeout in seconds for |pattern| check.
+ filter_func: Function to filter the output with before applying
+ |pattern|. If none, the |self.default_filter_func| is used.
+ Returns:
+ None if |pattern| was not found, the return value of match_line()
+ otherwise.
+ """
+
+ if filter_func is None:
+ filter_func = self.default_filter_func
+
+ t_end = None
+ if timeout is not None:
+ t_end = time.time() + timeout
+
+ line = self.readline()
+ while True:
+ m = match_line(line, pattern, filter_func)
+ if m is not None:
+ return m
+
+ if timeout is not None and time.time() >= t_end:
+ break
+
+ line = self.readline()
+
+ return None
+
+
+def dump_temp_files(tmp_path):
+ """ Dump all files in a directory (typically logs) """
+ logging.debug(
+ "================= DUMP OF ALL TEMPORARY FILES =================")
+
+ tmp_files = [
+ Path(root) / f for root, dirs, files in os.walk(str(tmp_path))
+ for f in files
+ ]
+
+ textchars = bytearray({7, 8, 9, 10, 12, 13, 27} |
+ set(range(0x20, 0x100)) - {0x7f})
+
+ def is_binary_string(bytes):
+ return bool(bytes.translate(None, textchars))
+
+ for f in tmp_files:
+ if f.name == '.lock':
+ continue
+
+ logging.debug("vvvvvvvvvvvvvvvvvvvv {} vvvvvvvvvvvvvvvvvvvv".format(f))
+
+ if not f.is_file():
+ logging.debug("[Not a regular file.]")
+ elif f.stat().st_size > 50 * 1024:
+ logging.debug("[File is too large to be shown ({} bytes).]".format(
+ f.stat().st_size))
+ else:
+ with open(str(f), 'rb') as fp:
+ data = fp.read(1024)
+ if is_binary_string(data):
+ logging.debug(
+ "[File contains {} bytes of binary data.]".format(
+ f.stat().st_size))
+ else:
+ fp.seek(0)
+ for line in fp:
+ line_str = line.decode(locale.getpreferredencoding(),
+ errors='backslashreplace')
+ logging.debug(line_str.rstrip())
+ logging.debug(
+ "^^^^^^^^^^^^^^^^^^^^ {} ^^^^^^^^^^^^^^^^^^^^\n".format(f))
+
+
+def load_sw_over_spi(tmp_path, spiflash_path, sw_test_bin, spiflash_args=[]):
+ """ Use the spiflash utility to load software onto a device. """
+
+ log.info("Flashing device software from {} over SPI".format(
+ str(sw_test_bin)))
+
+ cmd_flash = [spiflash_path, '--input', sw_test_bin] + spiflash_args
+ p_flash = Process(cmd_flash, logdir=tmp_path, cwd=tmp_path)
+ p_flash.run()
+ p_flash.proc.wait(timeout=600)
+ assert p_flash.proc.returncode == 0
+
+ log.info("Device software flashed.")
+
+
+def find_in_files(file_objects,
+ pattern,
+ timeout,
+ from_start=False,
+ filter_func=None,
+ wait_func=None):
+ """Find a pattern in a list of file objects (file descriptors)
+
+ See the documentation of match_line() for a description of |pattern|.
+
+ Args:
+ pattern: Pattern to search for
+ timeout: Timeout in seconds for |pattern| check. Set to None to wait
+ indefinitely.
+ from_start: Search from the start of the given file objects.
+ filter_func: Function to filter the output with before applying
+ |pattern|. If none, the |default_filter_func| is used.
+ wait_func: Function to call to wait. The function is expected to return
+ True if no further output is expected in |file_objects| (this will
+ end the wait loop before |timeout| expires), and False otherwise
+ (more output could be produced).
+ Returns:
+ None if |pattern| was not found, the return value of match_line()
+ otherwise.
+ """
+
+ t_end = None
+ if timeout is not None:
+ t_end = time.time() + timeout
+
+ if wait_func is None:
+ # By default, sleep for 200 ms when waiting for new input.
+ def wait_func():
+ time.sleep(.2)
+ return False
+
+ if from_start:
+ for file_object in file_objects:
+ file_object.seek(0)
+
+ while True:
+ i = 0
+ for file_object in file_objects:
+ i = 0
+ for line in file_object:
+ i += 1
+
+ m = match_line(line.rstrip(), pattern, filter_func)
+ if m is not None:
+ return m
+
+ # Check if we exceed the timeout. Do so only every 100 lines to reduce
+ # the performance impact.
+ if timeout is not None and i % 100 == 99 and time.time() >= t_end:
+ break
+
+ end_loop = wait_func()
+ if end_loop:
+ break
+
+ return None
+
+
+def match_line(line, pattern, filter_func=None):
+ """
+ Check if a line matches a pattern
+
+ Line endings (CR/LR) are removed from |line| and neither matched nor
+ returned.
+
+ pattern can be of two types:
+ 1. A regular expression (any object with a match attribute, or a re.Pattern
+ object in Python 3.7+). In this case, the pattern is matched against all
+ lines and the result of re.match(pattern) (a re.Match object since
+ Python 3.7) is returned.
+ 2. A string. In this case pattern is compared to each line with
+ startswith(), and the full matching line is returned on a match.
+ If no match is found None is returned.
+
+ Optionally apply a filter to the line before matching it.
+ """
+
+ if isinstance(line, bytes):
+ line = line.rstrip(b'\r\n')
+ else:
+ line = line.rstrip('\r\n')
+
+ if filter_func is not None:
+ line = filter_func(line)
+
+ # TODO: Check for an instance of re.Pattern once we move to Python 3.7+
+ if hasattr(pattern, "match"):
+ return pattern.match(line)
+ else:
+ if line.startswith(pattern):
+ return line
+
+ return None
+
+
+def filter_remove_device_sw_log_prefix(line):
+ """
+ Remove the file/line information in log messages produced by device software
+ LOG() macros.
+ """
+
+ # See base_log_internal_core() in lib/base/log.c for the format description.
+ pattern = r'^[IWEF?]\d{5} [a-zA-Z0-9\.-_]+:\d+\] '
+ if isinstance(line, bytes):
+ return re.sub(bytes(pattern, encoding='utf-8'), b'', line)
+ else:
+ return re.sub(pattern, '', line)