blob: f1b0598d182c37890c24c716079133c9f5115041 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
"""This script runs a GDB test and coordinates the required processes.
It loads a bitstream onto the connected FPGA and then launches OpenOCD, GDB, and
`opentitantool console` in the background. If any of these background processes
exit unsuccessfully, this script will exit with the same status. The one
exception is that we expect to terminate OpenOCD, which may cause it to exit
with a non-zero status.
Note that it is tightly coupled to the `opentitan_gdb_fpga_cw310_test` rule.
"""
import selectors
import subprocess
import sys
import time
from typing import Callable, Dict, List, NewType, Optional, TextIO, Tuple
import rich
import typer
ConsoleStyle = NewType('ConsoleStyle', str)
COLOR_RED = ConsoleStyle('red')
COLOR_GREEN = ConsoleStyle('green')
COLOR_PURPLE = ConsoleStyle('purple')
class BackgroundProcessGroup:
def __init__(self):
self.selector = selectors.DefaultSelector()
self.procs_queue: List[subprocess.Popen] = []
self.names: Dict[subprocess.Popen, str] = {}
self.console = rich.console.Console(color_system="256")
def run(self,
command: List[str],
label: str,
style: ConsoleStyle,
callback: Callable[[str], None] = None) -> subprocess.Popen:
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8')
self.procs_queue.append(proc)
self.names[proc] = label
# Register the new process with our selector. The `echo` closure may be
# called multiple times by `maybe_print_output`.
def echo(line):
self.console.print(f"[{label}] ", style=style, end='')
print(line, flush=True)
if callback is not None:
callback(line)
self.selector.register(proc.stdout, selectors.EVENT_READ, echo)
return proc
def empty(self) -> bool:
return len(self.procs_queue) == 0
def pop(self) -> subprocess.Popen:
return self.procs_queue.pop(0)
def push(self, proc: subprocess.Popen) -> None:
return self.procs_queue.append(proc)
def get_name(self, proc: subprocess.Popen) -> str:
return self.names[proc]
def forget(self, proc: subprocess.Popen) -> None:
assert proc not in self.procs_queue
self.maybe_print_output(
timeout_seconds=1) # Flush any remaining lines.
self.selector.unregister(proc.stdout)
self.names.pop(proc, None)
def _block_for_output(self,
timeout_seconds: int) -> List[Tuple[TextIO, str]]:
out = []
events = self.selector.select(timeout=timeout_seconds)
for key, mask in events:
line = key.fileobj.readline().rstrip()
callback = key.data
callback(line)
out.append((key.fileobj, line))
return out
def maybe_print_output(self, timeout_seconds: int) -> None:
self._block_for_output(timeout_seconds)
def block_until_line_contains(self,
proc: subprocess.Popen,
output_fragment: str,
num_seconds: int = 5) -> bool:
"""Block until `proc.stdout` emits a line containing `output_fragment`.
Returns True iff a matching line was seen. Keeps trying for up to
`num_seconds` seconds.
"""
start_time = time.monotonic()
while time.monotonic() <= start_time + num_seconds:
for fileobj, line in self._block_for_output(1):
if fileobj == proc.stdout and output_fragment in line:
return True
return False
app = typer.Typer(pretty_exceptions_enable=False)
@app.command()
def main(rom_kind: str = typer.Option(...),
openocd_path: str = typer.Option(...),
openocd_earlgrey_config: str = typer.Option(...),
openocd_jtag_adapter_config: str = typer.Option(...),
expect_debug_disallowed: bool = typer.Option(None),
gdb_path: str = typer.Option(...),
gdb_expect_output_sequence: List[str] = typer.Option(None),
gdb_script_path: str = typer.Option(...),
bitstream_path: str = typer.Option(...),
opentitantool_path: str = typer.Option(...),
exit_success_pattern: Optional[str] = typer.Option(None),
cw310_uarts: Optional[str] = typer.Option(None)):
opentitantool_prefix = [
opentitantool_path,
"--rcfile=",
"--logging=info",
"--interface=cw310",
]
if cw310_uarts is not None:
opentitantool_prefix.append('--cw310-uarts=' + cw310_uarts)
load_bitstream_command = opentitantool_prefix + [
"fpga",
"load-bitstream",
"--rom-kind=" + rom_kind,
bitstream_path,
]
openocd_command = [
openocd_path,
"-f",
openocd_jtag_adapter_config,
"-c",
"; ".join([
"adapter speed 200",
"transport select jtag",
"reset_config trst_and_srst",
"adapter srst delay 10",
]),
"-f",
openocd_earlgrey_config,
]
gdb_command = [
# For debugging, it may be useful to use `--init-command`, which causes
# GDB to drop to the interactive prompt when the script ends rather than
# exit.
gdb_path,
"--batch",
"--command=" + gdb_script_path,
]
console_command = opentitantool_prefix + [
"console",
"--timeout",
"30s",
]
if exit_success_pattern is not None:
console_command.append("--exit-success=" + exit_success_pattern)
# When `gdb_expect_output_sequence` is non-empty, change the definition of
# success for GDB. If the given lines are a subsequence of GDB's output,
# then GDB was successful, regardless of its exit status.
gdb_alternative_success_mode = gdb_expect_output_sequence != []
def gdb_maybe_consume_expected_line(line: str):
"""Pops the front of `gdb_expect_output_sequence` if `line` matches.
"""
if gdb_expect_output_sequence == []:
return
assert gdb_alternative_success_mode
want_line = gdb_expect_output_sequence[0]
if want_line.rstrip() == line.rstrip():
gdb_expect_output_sequence.pop(0)
# Wait until we've finished loading the bitstream.
subprocess.run(load_bitstream_command, check=True)
# Run OpenOCD, GDB, and the OpenTitanTool console in the background. Wait
# until OpenOCD has fired up its GDB server before launching the GDB client
# to avoid a subtle race condition.
background = BackgroundProcessGroup()
openocd = background.run(openocd_command, "OPENOCD", COLOR_PURPLE)
# For some reason, we don't reliably see the "starting gdb server" line when
# OpenOCD's GDB server is ready. It could be a buffering issue internal to
# OpenOCD or perhaps this script.
examined_riscv_core = background.block_until_line_contains(
openocd, "Examined RISC-V core; found 1 harts", num_seconds=5)
if not examined_riscv_core:
if expect_debug_disallowed:
sys.exit(0)
else:
print("Error: OpenOCD failed to examine the core.", flush=True)
sys.exit(1)
gdb = background.run(gdb_command,
"GDB",
COLOR_GREEN,
callback=gdb_maybe_consume_expected_line)
background.run(console_command, "CONSOLE", COLOR_RED)
while not background.empty():
background.maybe_print_output(timeout_seconds=1)
proc = background.pop()
# If we are defining GDB's success by checking output lines and we've
# seen all of the expected lines, kill GDB and ignore its return code.
if proc == gdb and gdb_alternative_success_mode and gdb_expect_output_sequence == []:
print("Terminating GDB now that it has printed the expected lines")
gdb.terminate()
gdb.wait()
background.forget(gdb)
continue
# When OpenOCD is the only remaining process, send it the TERM signal
# and wait for it to exit. GDB will exit naturally at the end of its
# script. The opentitantool console will either time out or exit due to
# the given success pattern.
if background.empty() and proc == openocd:
openocd.terminate()
openocd.wait()
background.forget(openocd)
break
returncode = proc.poll()
if returncode is None: # (If the process is still running...)
background.push(proc)
continue
print(f"{background.get_name(proc)} exited with code {returncode}")
if returncode != 0:
sys.exit(returncode)
background.forget(proc)
if __name__ == '__main__':
app()