| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2010-2023 Antmicro |
| # |
| # This file is licensed under the MIT License. |
| # Full license text is available in 'licenses/MIT.txt'. |
| # |
| |
| import asyncio |
| import argparse |
| import pexpect |
| import psutil |
| import re |
| import telnetlib |
| import difflib |
| from time import time |
| from os import path |
| from typing import Any, Optional, Callable, Awaitable |
| |
| RENODE_GDB_PORT = 2222 |
| RENODE_TELNET_PORT = 12348 |
| RE_HEX = re.compile(r"0x[0-9A-Fa-f]+") |
| RE_VEC_REGNAME = re.compile(r"v\d+") |
| RE_FLOAT_REGNAME = re.compile(r"f[tsa]\d+") |
| RE_GDB_ERRORS = ( |
| re.compile(r"\bUndefined .*?\.", re.MULTILINE), |
| re.compile(r"\bThe \"remote\" target does not support \".*?\"\.", re.MULTILINE), |
| re.compile(r"\bNo symbol \".*?\".*?\.", re.MULTILINE), |
| re.compile(r"\bCannot .*$", re.MULTILINE), |
| re.compile(r"\bRemote communication error\..*$", re.MULTILINE), |
| re.compile(r"\bRemote connection closed", re.MULTILINE), |
| re.compile(r"\bThe program has no registers.*?\.", re.MULTILINE), |
| re.compile(r"\bThe program is not being run.*?\.", re.MULTILINE), |
| re.compile(r"\b.*: cannot resolve name.*$", re.MULTILINE), |
| re.compile(r"\b.*: no such file or directory\.", re.MULTILINE), |
| re.compile(r"\bArgument required.*?\.", re.MULTILINE), |
| re.compile(r'\b.*: .*(not in executable format|file format not recognized)', re.MULTILINE), |
| re.compile(r"\bNo symbol table is loaded\.", re.MULTILINE), |
| ) |
| |
| parser = argparse.ArgumentParser( |
| description="Compare Renode execution with hardware/other simulator state using GDB") |
| |
| cmp_parser = parser.add_mutually_exclusive_group(required=True) |
| cmp_parser.add_argument("-c", "--gdb-command", |
| dest="command", |
| default=None, |
| help="GDB command to run on both instances after each instruction. Outputs of these commands are compared against each other.") |
| cmp_parser.add_argument("-R", "--register-list", |
| dest="registers", |
| action="store", |
| default=None, |
| help="Sequence of register names to compare. Formated as ';' separated list of register names, e.g. 'pc;ra'") |
| |
| parser.add_argument("-r", "--reference-command", |
| dest="reference_command", |
| action="store", |
| required=True, |
| help="Command used to run the GDB server provider used as a reference") |
| parser.add_argument("-s", "--renode-script", |
| dest="renode_script", |
| action="store", |
| required=True, |
| help="Path to the '.resc' script") |
| parser.add_argument("-p", "--reference-gdb-port", |
| type=int, |
| dest="reference_gdb_port", |
| action="store", |
| required=True, |
| help="Port on which the reference GDB server can be reached") |
| parser.add_argument("--renode-gdb-port", |
| type=int, |
| dest="renode_gdb_port", |
| action="store", |
| default=RENODE_GDB_PORT, |
| help="Port on which Renode will comunicate with GDB server") |
| parser.add_argument("-P", "--renode-telnet-port", |
| type=int, |
| dest="renode_telnet_port", |
| action="store", |
| default=RENODE_TELNET_PORT, |
| help="Port on which Renode will comunicate with telnet") |
| parser.add_argument("-b", "--binary", |
| dest="debug_binary", |
| action="store", |
| required=True, |
| help="Path to ELF file with symbols") |
| parser.add_argument("-x", "--renode-path", |
| dest="renode_path", |
| action="store", |
| default="renode", |
| help="Path to the Renode runscript") |
| parser.add_argument("-g", "--gdb-path", |
| dest="gdb_path", |
| action="store", |
| default="/usr/bin/gdb", |
| help="Path to the GDB binary to be run") |
| parser.add_argument("-f", "--start-frame", |
| dest="start_frame", |
| action="store", |
| default=None, |
| help="Sequence of jumps to reach target frame. Formated as 'addr, occurrence', separated with ';', e.g. '_start,1;printf,7'") |
| parser.add_argument("-i", "--interest-points", |
| dest="ips", |
| action="store", |
| default=None, |
| help="Sequence of address, interest points, after which state will be compared. Formatted as ';' spearated list of hexadecimal addresses, e.g. '0x8000;0x340eba3c'") |
| parser.add_argument("-S", "--stop-address", |
| dest="stop_address", |
| action="store", |
| default=None, |
| help="Stop condition, if reached script will stop") |
| |
| SECTION_SEPARATOR = "==================================================" |
| |
| # A stack is a list of (address, nth_occurrence) tuples. |
| # `address` is a PC value, as a hex string, e.g. "0x00f24710". |
| # `nth_occurrence` is the number of times `address` was reached since the start of execution. |
| # Therefore, assuming deterministic runtime, an arbitrary program state can be reached |
| # by setting a breakpoint at `address` and continuing `nth_occurrence` times. |
| Stack = list[tuple[str, int]] |
| |
| |
| class Renode: |
| """A class for communicating with a remote instance of Renode.""" |
| def __init__(self, binary: str, port: int): |
| """Spawns a new instance of Renode and connects to it through Telnet.""" |
| print(f"* Starting Renode instance on telnet port {port}") |
| # making sure there is only one instance of Renode on this port |
| for p in psutil.process_iter(): |
| process_name = p.name().casefold() |
| if "renode" in process_name and str(port) in process_name: |
| print("!!! Found another instance of Renode running on the same port. Killing it before proceeding") |
| p.kill() |
| try: |
| self.proc = pexpect.spawn(f"{binary} --disable-gui --plain --port {port}", timeout=20) |
| self.proc.stripcr = True |
| self.proc.expect("Monitor available in telnet mode on port") |
| except pexpect.exceptions.EOF as err: |
| print("!!! Renode failed to start telnet server! (is --renode-path correct? is --renode-telnet-port available?)") |
| raise err |
| self.connection = telnetlib.Telnet("localhost", port) |
| # Sometimes first command does not work, hence we send this dummy one to make sure we got functional connection right after initialization |
| self.command("echo 'Connected to GDB comparator'") |
| |
| def close(self) -> None: |
| """Closes the underlying Renode instance.""" |
| self.command("quit", expected_log="Disposed") |
| |
| def command(self, input: str, expected_log: str = "") -> None: |
| """Sends an arbitrary command to the underlying Renode instance.""" |
| if not self.proc.isalive(): |
| print("!!! Renode has died!") |
| print("Process:") |
| print(str(self.proc)) |
| raise RuntimeError |
| |
| input = input + "\n" |
| self.connection.write(input.encode()) |
| if expected_log != "": |
| try: |
| self.proc.expect([expected_log.encode()]) |
| except pexpect.TIMEOUT as err: |
| print(SECTION_SEPARATOR) |
| print(f"Renode command '{input.strip()}' failed!") |
| print(f"Expected regex '{expected_log}' was not found") |
| print("Process:") |
| print(str(self.proc)) |
| print(SECTION_SEPARATOR) |
| print(f"{err=} ; {type(err)=}") |
| exit(1) |
| |
| def get_output(self) -> bytes: |
| """Reads all output from the Telnet connection.""" |
| return self.connection.read_all() |
| |
| |
| class GDBInstance: |
| """A class for controlling a remote GDB instance.""" |
| def __init__(self, gdb_binary: str, port: int, debug_binary: str, name: str, target_process: pexpect.spawn): |
| """Spawns a new GDB instance and connects to it.""" |
| self.dimensions = (0, 4096) |
| self.name = name |
| self.last_cmd = "" |
| self.last_output = "" |
| self.task: Awaitable[Any] |
| self.target_process = target_process |
| print(f"* Connecting {self.name} GDB instance to target on port {port}") |
| self.process = pexpect.spawn(f"{gdb_binary} --silent --nx --nh", timeout=10, dimensions=self.dimensions) |
| self.process.timeout = 120 |
| self.run_command("clear", async_=False) |
| self.run_command("set pagination off", async_=False) |
| self.run_command(f"file {debug_binary}", async_=False) |
| self.run_command(f"target remote :{port}", async_=False) |
| |
| def close(self) -> None: |
| """Closes the underlying GDB instance.""" |
| self.run_command("quit", dont_wait_for_output=True, async_=False) |
| |
| def progress_by(self, delta: int, type: str = "stepi") -> None: |
| """Steps `delta` times.""" |
| adjusted_timeout = max(120, int(delta) / 5) |
| self.run_command(type + (f" {delta}" if int(delta) > 1 else ""), timeout=adjusted_timeout) |
| |
| def get_symbol_at(self, addr: str) -> str: |
| """Returns the name of the symbol which is stored at `addr` (`info symbol`).""" |
| self.run_command(f"info symbol {addr}", async_=False) |
| return self.last_output.splitlines()[-1] |
| |
| def delete_breakpoints(self) -> None: |
| """Deletes all breakpoints.""" |
| self.run_command("clear", async_=False) |
| |
| def run_command(self, command: str, timeout: float = 10, confirm: bool = False, dont_wait_for_output: bool = False, async_: bool = True) -> None: |
| """Send an arbitrary command to the underlying GDB instance.""" |
| if not self.process.isalive(): |
| print(f"!!! The {self.name} GDB process has died!") |
| print("Process:") |
| print(str(self.process)) |
| self.last_output = "" |
| raise RuntimeError |
| if not self.target_process.isalive(): |
| print(f"!!! {self.name} GDB's target has died!") |
| print("Target process:") |
| print(str(self.target_process)) |
| self.last_output = "" |
| raise RuntimeError |
| |
| self.last_cmd = command |
| self.process.write(command + "\n") |
| if dont_wait_for_output: |
| return |
| try: |
| if not confirm: |
| result = self.process.expect(re.escape(command) + r".+\n", timeout, async_=async_) |
| self.task = result if async_ else None |
| if not async_: |
| self.last_output = "" |
| line = self.process.match[0].decode().strip("\r") |
| while "(gdb)" not in line: |
| self.last_output += line |
| self.process.expect([r".+\n", r"\(gdb\)"], timeout) |
| line = self.process.match[0].decode().strip("\r") |
| self.validate_response(self.last_output) |
| else: |
| self.process.expect("[(]y or n[)]") |
| self.process.writelines("y") |
| result = self.process.expect("[(]gdb[)]", async_=async_) |
| self.task = result if async_ else None |
| self.last_output = self.process.match[0].decode().strip("\r") |
| |
| except pexpect.TIMEOUT as err: |
| print(f"!!! {self.name} GDB: Command '{command}' timed out!") |
| print("Process:") |
| print(str(self.process)) |
| self.last_output = "" |
| raise err |
| except pexpect.exceptions.EOF as err: |
| print(f"!!! {self.name} GDB: pexpect encountered an unexpected EOF (is --gdb-path correct?)") |
| print("Process:") |
| print(str(self.process)) |
| self.last_output = "" |
| raise err |
| |
| def print_stack(self, stack: Stack) -> None: |
| """Prints a stack.""" |
| print("Address\t\tOccurrence\t\tSymbol") |
| for address, occurrence in stack: |
| print(f"{address}\t{occurrence}\t{self.get_symbol_at(address)}") |
| |
| def validate_response(self, response: str) -> None: |
| """Scans a GDB response for common error messages.""" |
| for regex in RE_GDB_ERRORS: |
| err_match = regex.search(response) |
| if err_match is not None: |
| print(f"!!! {self.name} GDB: {err_match[0].strip()} (last command: \"{self.last_cmd}\")") |
| # Assuming we correctly identified a GDB error, this would be |
| # the right place to terminate execution. However, there is |
| # a risk of a false positive, so it's safer not to (if it is |
| # a critical error, it will most likely cause a timeout anyway). |
| |
| async def get_pc(self) -> str: |
| """Returns the value of the PC register, as a hex string.""" |
| self.run_command("i r pc") |
| await self.expect() |
| pc_match = RE_HEX.search(self.last_output) |
| if pc_match is not None: |
| return pc_match[0] |
| else: |
| raise TypeError |
| |
| async def expect(self, timeout: float = 10) -> None: |
| """Await execution of the last command to finish and update `self.last_output`.""" |
| try: |
| await self.task |
| line = self.process.match[0].decode().strip("\r") |
| self.last_output = "" |
| while "(gdb)" not in line: |
| self.last_output += line |
| self.task = self.process.expect([r".+\n", r"\(gdb\)"], timeout, async_=True) |
| await self.task |
| line = self.process.match[0].decode().strip("\r") |
| self.validate_response(self.last_output) |
| |
| except pexpect.TIMEOUT as err: |
| print(f"!!! {self.name} GDB: Command '{self.last_cmd}' timed out!") |
| print("Process:") |
| print(str(self.process)) |
| self.last_output = "" |
| raise err |
| |
| |
| class GDBComparator: |
| """A helper class to aggregate control over 2 `GDBInstance` objects.""" |
| |
| COMMAND_NAME = "gdb_compare__print_registers" |
| COMMANDS = None |
| |
| # REGISTER_CASES is an ordered list of (condition_func, cmd_builder_func) tuples. |
| # It is used to assign registers to groups based on their type, and for each such group |
| # have a dedicated function that constructs a gdb command to pretty-print those registers. |
| # Each tuple in REGISTER_CASES represents a group of registers. condition_func is used to |
| # determine whether a register belongs to the group. cmd_builder_func intakes a list of |
| # registers belonging to the group and returns a GDB command to print all their values. |
| # The order of tuples matters - only the first match is used. |
| RegNameTester = Callable[[str], bool] # condition_func type |
| CommandsBuilder = Callable[[list[str]], list[str]] # cmd_builder_func type |
| REGISTER_CASES: list[tuple[RegNameTester, CommandsBuilder]] = [ |
| (lambda reg: RE_VEC_REGNAME.fullmatch(reg) is not None, lambda regs: [f"p/x (char[])${reg}.b" for reg in regs]), |
| (lambda reg: RE_FLOAT_REGNAME.fullmatch(reg) is not None, lambda regs: [f"p/x (char[])${reg}" for reg in regs]), |
| (lambda _: True, lambda regs: ["printf \"" + ": 0x%x\\n".join(regs) + ": 0x%x\\n\",$" + ",$".join(regs)]), |
| ] |
| |
| def __init__(self, args: argparse.Namespace, renode_proc: pexpect.spawn, ref_proc: pexpect.spawn): |
| """Creates 2 `GDBInstance` objects, one expecting to connect on port `args.renode_gdb_port` and the other on `args.reference_gdb_port`.""" |
| self.instances = [ |
| GDBInstance(args.gdb_path, args.renode_gdb_port, args.debug_binary, "Renode", renode_proc), |
| GDBInstance(args.gdb_path, args.reference_gdb_port, args.debug_binary, "Reference", ref_proc), |
| ] |
| self.cmd = args.command if args.command else self.build_command_from_register_list(args.registers.split(";")) |
| |
| def close(self) -> None: |
| """Closes all owned instances.""" |
| for i in self.instances: |
| i.close() |
| |
| def build_command_from_register_list(self, regs: list[str]) -> str: |
| """Defines a custom gdb command for pretty-printing all registers and returns its name.""" |
| if GDBComparator.COMMANDS is None: |
| # Assign registers to groups based on the RegNameTester functions |
| reg_groups: dict[GDBComparator.CommandsBuilder, list[str]] = {} |
| for reg in regs: |
| for test, cmds_builder in GDBComparator.REGISTER_CASES: |
| if test(reg): |
| reg_groups.setdefault(cmds_builder, []).append(reg) |
| break |
| |
| # Compose a gdb script that defines a custom command for printing all groups of registers |
| GDBComparator.COMMANDS = [ |
| f"define {GDBComparator.COMMAND_NAME}", |
| *[cmd for cmds_builder, reg_group in reg_groups.items() for cmd in cmds_builder(reg_group)], |
| "end" |
| ] |
| |
| # Warn if for any GDBInstance there is a register that was requested by the user |
| # but does not appear in the output of "info registers all" |
| for i in self.instances: |
| i.run_command("i r all", async_=False) |
| reported_regs = list(map(lambda x: x.split()[0], i.last_output.split("\n")[1:-1])) |
| not_found = list(filter(lambda reg: reg not in reported_regs, regs)) |
| if not_found: |
| print("WARNING: " + ", ".join(not_found) + " register[s] not found when executing 'info registers all' for " + i.name) |
| |
| # Define the custom command |
| commands = GDBComparator.COMMANDS |
| for i in self.instances: |
| for cmd in commands[:-1]: |
| i.run_command(cmd, dont_wait_for_output=True, async_=False) |
| i.run_command(commands[-1], async_=False) |
| |
| return GDBComparator.COMMAND_NAME |
| |
| def delete_breakpoints(self) -> None: |
| """Deletes all breakpoints in all owned instances.""" |
| for i in self.instances: |
| i.delete_breakpoints() |
| |
| def get_symbol_at(self, addr: str) -> str: |
| """Returns the name of the symbol which is stored at `addr` (`info symbol`).""" |
| return self.instances[0].get_symbol_at(addr) |
| |
| def print_stack(self, stack: Stack) -> None: |
| """Prints a stack.""" |
| return self.instances[0].print_stack(stack) |
| |
| async def run_command(self, cmd: Optional[str] = None, **kwargs: Any) -> list[str]: |
| """Sends an arbitrary command to all owned instances and returns a list of outputs.""" |
| cmd = cmd if cmd else self.cmd |
| for i in self.instances: |
| i.run_command(cmd, **kwargs) |
| await asyncio.gather(*[i.expect(**kwargs) for i in self.instances]) |
| return [i.last_output for i in self.instances] |
| |
| async def get_pcs(self) -> list[str]: |
| """Returns a list containing the values of PC registers of all owned instances, as hex strings.""" |
| return await asyncio.gather(*[i.get_pc() for i in self.instances]) |
| |
| async def progress_by(self, delta: int, type: str = "stepi") -> None: |
| """Steps `delta` times in all owned instances.""" |
| adjusted_timeout = max(120, int(delta) / 5) |
| await self.run_command(type + (f" {delta}" if int(delta) > 1 else ""), timeout=adjusted_timeout) |
| |
| async def compare_instances(self, previous_pc: str) -> None: |
| """Compares the execution states of all owned instances. `previous_pc` must refer to the previous value of PC; it does not offer a choice.""" |
| for name, command in [("Opcode at previous pc", f"x/i {previous_pc}"), ("Frame", "frame"), ("Registers", "info registers all")]: |
| print("*** " + name + ":") |
| GDBComparator.compare_outputs(await self.run_command(command)) |
| |
| @staticmethod |
| def compare_outputs(outputs: list[str]) -> None: |
| """Prints a comparison of two output strings (same & different values).""" |
| assert len(outputs) == 2 |
| output1_dict: dict[str, str] = {} |
| output2_dict: dict[str, str] = {} |
| |
| # Truncate 1st elements in outputs, because it's the repl |
| for output, output_dict in zip([x.split("\n")[1:] for x in outputs], [output1_dict, output2_dict]): |
| for x in output: |
| end_of_name = x.strip().find(" ") |
| name = x[:end_of_name].strip() |
| rest = x[end_of_name:].strip() |
| output_dict[name] = rest |
| |
| output_same = "" |
| output_different = "" |
| |
| for name in output1_dict.keys(): |
| if name in output2_dict: |
| if name == "": |
| continue |
| if output1_dict[name] != output2_dict[name]: |
| output_different += f">> {name}:\n" |
| output_different += string_compare(output1_dict[name], output2_dict[name]) + "\n" |
| else: |
| output_same += f">> {name}:\t{output1_dict[name]}\n" |
| |
| if len(output_different) == 0: |
| print("Same:") |
| print(output_same) |
| else: |
| print("Same values:") |
| print(output_same) |
| print("Different values:") |
| print(output_different) |
| |
| |
| def setup_processes(args: argparse.Namespace) -> tuple[Renode, pexpect.spawn, GDBComparator]: |
| """Spawns Renode, the reference process, `GDBComparator` and returns their handles (in that order).""" |
| reference = pexpect.spawn(args.reference_command, timeout=10) |
| renode = Renode(args.renode_path, args.renode_telnet_port) |
| renode.command("include @" + path.abspath(args.renode_script), expected_log="System bus created") |
| renode.command(f"machine StartGdbServer {args.renode_gdb_port}", expected_log=f"started on port :{args.renode_gdb_port}") |
| gdb_comparator = GDBComparator(args, renode.proc, reference) |
| renode.command("start") |
| return renode, reference, gdb_comparator |
| |
| def string_compare(renode_string: str, reference_string: str) -> str: |
| """Returns a pretty diff of two single-line strings.""" |
| BOLD = "\033[1m" |
| END = "\033[0m" |
| RED = "\033[91m" |
| GREEN = "\033[92m" |
| |
| renode_string = re.sub(r"\x1b\[[0-9]*m", "", renode_string) |
| reference_string = re.sub(r"\x1b\[[0-9]*m", "", reference_string) |
| |
| assert len(RED) == len(GREEN) |
| formatting_length = len(BOLD + RED + END) |
| |
| s1_insertions = 0 |
| s2_insertions = 0 |
| diff = difflib.SequenceMatcher(None, renode_string, reference_string) |
| |
| for type, s1_start, s1_end, s2_start, s2_end in diff.get_opcodes(): |
| if type == "equal": |
| continue |
| elif type == "replace": |
| s1_start += s1_insertions * formatting_length |
| s1_end += s1_insertions * formatting_length |
| s2_end += s2_insertions * formatting_length |
| s2_start += s2_insertions * formatting_length |
| renode_string = renode_string[:s1_start] + GREEN + BOLD + renode_string[s1_start:s1_end] + END + renode_string[s1_end:] |
| reference_string = reference_string[:s2_start] + RED + BOLD + reference_string[s2_start:s2_end] + END + reference_string[s2_end:] |
| s1_insertions += 1 |
| s2_insertions += 1 |
| elif type == "insert": |
| s2_end += s2_insertions * (len(BOLD) + len(RED) + len(END)) |
| s2_start += s2_insertions * (len(BOLD) + len(RED) + len(END)) |
| reference_string = reference_string[:s2_start] + RED + BOLD + \ |
| reference_string[s2_start:s2_end] + END + reference_string[s2_end:] |
| s2_insertions += 1 |
| elif type == "delete": |
| s1_end += s1_insertions * (len(BOLD) + len(GREEN) + len(END)) |
| s1_start += s1_insertions * (len(BOLD) + len(GREEN) + len(END)) |
| renode_string = renode_string[:s1_start] + GREEN + BOLD + \ |
| renode_string[s1_start:s1_end] + END + renode_string[s1_end:] |
| s1_insertions += 1 |
| return f"Renode: {renode_string}\nReference: {reference_string}" |
| |
| |
| class CheckStatus: |
| """This class serves as an enum for possible outcomes of the `check` function.""" |
| STOP = 1 |
| CONTINUE = 2 |
| FOUND = 3 |
| MISMATCH = 4 |
| |
| |
| async def check(stack: Stack, gdb_comparator: GDBComparator, previous_pc: str, previous_output: str, steps_count: int, exec_count: dict[str, int], time_of_start: float, args: argparse.Namespace) -> tuple[str, str, int]: |
| """Executes the next `gdb_comparator` instruction, compares the outputs and returns the new PC value, output and `CheckStatus`.""" |
| ren_pc, pc = await gdb_comparator.get_pcs() |
| pc_mismatch = False |
| if pc != ren_pc: |
| print("Renode and reference PC differs!") |
| print(string_compare(ren_pc, pc)) |
| print(f"\tPrevious PC: {previous_pc}") |
| pc_mismatch = True |
| if pc not in exec_count: |
| exec_count[pc] = 0 |
| exec_count[pc] += 1 |
| |
| if args.stop_address and int(ren_pc, 16) == args.stop_address: |
| print("stop address reached") |
| return previous_pc, previous_output, CheckStatus.STOP |
| |
| if not pc_mismatch: |
| output_ren, output_reference = map(lambda s: s.splitlines(), await gdb_comparator.run_command()) |
| |
| for line in range(len(output_ren)): |
| if output_ren[line] != output_reference[line]: |
| print(SECTION_SEPARATOR) |
| print(f"!!! Difference in line {line + 1} of output:") |
| print(string_compare(output_ren[line], output_reference[line])) |
| print(f"Previous: {previous_output}") |
| break |
| else: |
| if steps_count % 10 == 0: |
| print(f"{steps_count} steps; current pc = {pc} {gdb_comparator.get_symbol_at(pc)}") |
| previous_pc = pc |
| previous_output = "\n".join(output_ren[1:]) |
| return previous_pc, previous_output, CheckStatus.CONTINUE |
| |
| if pc_mismatch or (len(stack) > 0 and previous_pc == stack[-1][0]): |
| print(SECTION_SEPARATOR) |
| print("Found faulting insn at " + previous_pc + " " + gdb_comparator.get_symbol_at(previous_pc)) |
| elapsed_time = time() - time_of_start |
| print(f"Took {elapsed_time:.2f} seconds [~ {elapsed_time/steps_count:.2f} steps/sec]") |
| print(SECTION_SEPARATOR) |
| print("*** Stack:") |
| gdb_comparator.print_stack(stack) |
| print("*** Gdb command:") |
| print(args.command) |
| print(SECTION_SEPARATOR) |
| print("Gdb instances comparision:") |
| await gdb_comparator.compare_instances(previous_pc) |
| |
| return previous_pc, previous_output, CheckStatus.FOUND |
| |
| if previous_pc not in exec_count: |
| previous_pc = pc |
| print("Found point after which state is different. Adding to `stack` for later iterations") |
| occurrence = exec_count[previous_pc] |
| print(f"\tAddress: {previous_pc}\n\tOccurrence: {occurrence}") |
| stack.append((previous_pc, occurrence)) |
| exec_count = {} |
| print(SECTION_SEPARATOR) |
| |
| return previous_pc, previous_output, CheckStatus.MISMATCH |
| |
| async def main() -> None: |
| """Script entry point.""" |
| args = parser.parse_args() |
| assert 0 <= args.reference_gdb_port <= 65535, "Illegal reference GDB port" |
| assert 0 <= args.renode_gdb_port <= 65535, "Illegal Renode GDB port" |
| assert 0 <= args.renode_telnet_port <= 65535, "Illegal Renode Telnet port" |
| assert args.reference_gdb_port != args.renode_gdb_port != args.renode_telnet_port, "Overlapping port numbers" |
| if args.stop_address: |
| args.stop_address = int(args.stop_address, 16) |
| |
| pcs = [args.stop_address] if args.stop_address else [] |
| if args.ips: |
| pcs += [pc for pc in args.ips.split(";")] |
| |
| execution_cmd = "continue" if args.ips else "nexti" |
| print(SECTION_SEPARATOR) |
| time_of_start = time() |
| previous_pc = "Unknown" |
| previous_output = "Unknown" |
| steps_count = 0 |
| iterations_count = 0 |
| stack = [] |
| |
| if args.start_frame is not None: |
| jumps = args.start_frame.split(";") |
| for jump in jumps: |
| addr, occur = jump.split(",") |
| address = addr.strip() |
| occurrence = int(occur.strip()) |
| stack.append((address, occurrence)) |
| |
| insn_found = False |
| |
| while not insn_found: |
| iterations_count += 1 |
| print("Preparing processes for iteration number " + str(iterations_count)) |
| renode, reference, gdb_comparator = setup_processes(args) |
| if len(stack) != 0: |
| print("Recreating stack; jumping to breakpoint at:") |
| for address, count in stack: |
| print("\t" + address + ", " + str(count) + " occurrence") |
| await gdb_comparator.run_command(f"break *{address}") |
| |
| for _ in range(count): |
| await gdb_comparator.run_command("continue", timeout=120) |
| |
| gdb_comparator.delete_breakpoints() |
| print("Stepping single instruction") |
| await gdb_comparator.progress_by(1) |
| |
| for pc in pcs: |
| await gdb_comparator.run_command(f"br *{pc}") |
| |
| exec_count: dict[str, int] = {} |
| print("Starting execution") |
| while True: |
| await gdb_comparator.run_command(execution_cmd) |
| steps_count += 1 |
| |
| previous_pc, previous_output, status = await check(stack, gdb_comparator, previous_pc, previous_output, steps_count, exec_count, time_of_start, args) |
| |
| if status == CheckStatus.CONTINUE and execution_cmd == "continue": |
| await gdb_comparator.run_command("stepi") |
| steps_count += 1 |
| |
| previous_pc, previous_output, status = await check(stack, gdb_comparator, previous_pc, previous_output, steps_count, exec_count, time_of_start, args) |
| |
| if status == CheckStatus.STOP: |
| return |
| elif status == CheckStatus.CONTINUE: |
| continue |
| elif status == CheckStatus.FOUND: |
| insn_found = True |
| break |
| elif status == CheckStatus.MISMATCH: |
| execution_cmd = "nexti" |
| gdb_comparator.close() |
| renode.close() |
| reference.close(force=True) |
| break |
| else: |
| exit(1) |
| |
| if __name__ == "__main__": |
| asyncio.run(main()) |
| exit(0) |