| #! /usr/bin/env python3 |
| |
| """Elaborate a Spike trace for TBM.""" |
| |
| import argparse |
| import logging |
| import re |
| import sys |
| from typing import IO, Optional, Sequence |
| |
| import flatbuffers |
| |
| # Generated by `flatc`. |
| import FBInstruction.Instructions as FBInstrs |
| |
| import disassembler |
| from instruction import Instruction |
| import utilities |
| from utilities import CallEvery |
| from utilities import FileFormat |
| |
| |
| logger = logging.getLogger("gentrace-spike") |
| |
| |
| class ElaborateTrace: |
| """Elaborate a trace from a spike log file.""" |
| |
| def __init__(self, input_file: IO, output_file: IO, |
| output_format: FileFormat, |
| output_buffer_size: int, |
| functions: Optional[Sequence[Sequence[int]]]) -> None: |
| """Init. |
| |
| Args: |
| input_file: an open log file |
| output_file: an open output file |
| output_format: generate json or flatbuffers output. |
| functions: optional list of PC ranges to include in the output. |
| """ |
| self._input_file = input_file |
| self._output_file = output_file |
| self._output_format = output_format |
| self._functions = functions |
| |
| # The current instruction being processed. |
| # See the curr_instr @property below. |
| self._curr_instr = None |
| |
| # Buffer instructions before writing them to the output file. |
| self._instrs_buf = [] |
| self._output_buffer_size = output_buffer_size |
| |
| # The number of instructions included in the output trace (some might |
| # be buffered). |
| self.instr_count = 0 |
| |
| # If `discard_until` is set to some int, instructions from the trace |
| # are discarded until an instruction from address `discard_until` is |
| # read from the trace. |
| self._discard_until = None |
| |
| # Precompile some REs: |
| self._rinstr = re.compile( |
| # pylint: disable-next=line-too-long |
| r"^core\s+(\d+):\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(\S*)\s+(.*)$" |
| ) |
| self._rstate = re.compile( |
| # pylint: disable-next=line-too-long |
| r"^core\s+(\d+):\s+3\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(.*)$" |
| ) |
| self._rexception = re.compile(r"^core\s+(\d+):\s+exception\s+(.*)$") |
| self._rcsr = re.compile(r"c\d+_") |
| |
| self._rvloadwhole = re.compile(r"vl(\d)re(\d+).v") |
| self._rvstorewhole = re.compile(r"vs(\d)r.v") |
| |
| @property |
| def curr_instr(self) -> Instruction: |
| return self._curr_instr |
| |
| @curr_instr.setter |
| def curr_instr(self, instr: Instruction) -> None: |
| self.instr_count += 1 |
| |
| if self._curr_instr: |
| if instr.addr != self._curr_instr.addr + 4: |
| self._curr_instr.branch_target = instr.addr |
| |
| self.clear_curr_instr() |
| |
| self._curr_instr = instr |
| |
| def clear_curr_instr(self) -> None: |
| if self._curr_instr: |
| self._instrs_buf.append(self._curr_instr) |
| |
| if len(self._instrs_buf) == self._output_buffer_size: |
| self.write_to_file() |
| |
| self._curr_instr = None |
| |
| |
| def run(self) -> None: |
| for line in self._input_file: |
| if self.try_instruction(line): |
| continue |
| |
| if self._discard_until: |
| continue |
| |
| if self.try_state(line): |
| continue |
| |
| if self.try_exception(line): |
| continue |
| |
| # Flush out the instructions buffer. |
| self.clear_curr_instr() |
| if self._instrs_buf: |
| self.write_to_file() |
| |
| def try_instruction(self, line: str) -> bool: |
| """Parse the first line of instruction execution.""" |
| m = self._rinstr.match(line) |
| if m: |
| addr = int(m.group(2), 16) |
| |
| if self._discard_until: |
| if addr == self._discard_until: |
| # We reached the desired location, stop discarding |
| # instructions. |
| self._discard_until = None |
| else: |
| # Discard this instruction |
| return True |
| |
| if self._functions is not None and all( |
| addr not in r for r in self._functions): |
| # This instruction is not to be included in the output. |
| self.clear_curr_instr() |
| return True |
| |
| opcode = int(m.group(3), 16) |
| mnemonic = m.group(4) |
| ops = m.group(5).split(", ") if m.group(5) else [] |
| (inputs, outputs) = disassembler.asm_registers(mnemonic, ops) |
| new_instr = Instruction(addr=addr, |
| opcode=opcode, |
| mnemonic=mnemonic, |
| operands=ops, |
| inputs=inputs, |
| outputs=outputs, |
| is_nop=disassembler.is_nop(mnemonic), |
| is_branch=disassembler.is_branch(mnemonic), |
| branch_target=None, |
| is_flush=disassembler.is_flush(mnemonic), |
| is_vctrl=disassembler.is_vctrl(mnemonic), |
| loads=[], |
| stores=[], |
| lmul=None, |
| sew=None, |
| vl=None) |
| self.curr_instr = new_instr |
| return True |
| |
| return False |
| |
| def try_state(self, line: str) -> bool: |
| """Parse the state accesses of the instruction.""" |
| m = self._rstate.match(line) |
| if m: |
| if self.curr_instr is None: |
| return True |
| |
| changes = m.group(4) |
| |
| # Apply substitutions to convert non-architectural names for CSRs |
| # such as "c8_vstart" to architectural name "vstart". |
| changes = self._rcsr.sub(r"", changes) |
| |
| # Most entries are simple pairs of "<register> <new value>" |
| # but there are exceptions such as vtype changes, memory writes |
| # (triples that include the value written), etc. |
| # So we parse the entries one at a time |
| changes = changes.split() |
| i = 0 |
| while i < len(changes): |
| r = changes[i] |
| |
| if r == "mem": |
| # memory access |
| if (len(changes) > i + 2 and |
| changes[i + 2].startswith("0x")): |
| # memory write |
| addr = int(changes[i + 1], 16) |
| self.curr_instr.stores.append(addr) |
| i += 3 |
| else: |
| # memory read |
| addr = int(changes[i + 1], 16) |
| self.curr_instr.loads.append(addr) |
| i += 2 |
| |
| elif r in ["m1", "m2", "m4", "m8"]: |
| mv = self._rvloadwhole.match(self.curr_instr.mnemonic) |
| if mv: |
| # load whole vector register |
| self.curr_instr.lmul = int(mv.group(1)) |
| else: |
| mv = self._rvstorewhole.match(self.curr_instr.mnemonic) |
| if mv: |
| # store whole vector register |
| self.curr_instr.lmul = int(mv.group(1)) |
| else: |
| # vector LMUL |
| self.curr_instr.lmul = int(r[1]) |
| i += 1 |
| |
| elif r in ["mf8", "mf4", "mf2"]: |
| # vector fractional LMUL |
| self.curr_instr.lmul = 1 / int(r[2]) |
| i += 1 |
| |
| elif r in ["e8", "e16", "e32", "e64"]: |
| # vector element size |
| self.curr_instr.sew = int(r[1:]) |
| i += 1 |
| |
| elif (r.startswith("x") or r.startswith("v") or |
| r.startswith("f") or r in disassembler.CSRS): |
| # register write |
| # TODO(sflur): collect those as outputs |
| i += 2 |
| |
| elif r.startswith("l"): |
| # vector length |
| self.curr_instr.vl = int(r[1:]) |
| i += 1 |
| |
| else: |
| # There shouldn't be anything we didn't expect. |
| assert False |
| |
| return True |
| |
| return False |
| |
| def try_exception(self, line: str) -> bool: |
| """Parse exceptions.""" |
| m = self._rexception.match(line) |
| if m: |
| if self.curr_instr is None: |
| return True |
| |
| # We don't want to process the exception handling. |
| # TODO(sflur): there might be exceptions we do want to process? |
| # Note that some of the exceptions are artifacts of Spike, e.g. |
| # `write_tohost`. |
| # TODO(sflur): instead of using `addr` we can extract epc from |
| # `line` (note that epc points to the ecall instruction, and not to |
| # the next instruction). |
| self._discard_until = self.curr_instr.addr + 4 |
| self.clear_curr_instr() |
| return True |
| |
| return False |
| |
| def write_to_file(self) -> None: |
| if self._output_format == FileFormat.JSON: |
| instrs = [i.to_json() for i in self._instrs_buf] |
| print("\n".join(instrs), file=self._output_file) |
| |
| else: |
| assert self._output_format == FileFormat.FLATBUFFERS |
| |
| builder = flatbuffers.Builder() |
| instrs = [i.fb_build(builder) for i in self._instrs_buf] |
| |
| FBInstrs.StartInstructionsVector(builder, len(instrs)) |
| for x in reversed(instrs): |
| builder.PrependUOffsetTRelative(x) |
| instrs = builder.EndVector() |
| |
| FBInstrs.Start(builder) |
| FBInstrs.AddInstructions(builder, instrs) |
| instrs = FBInstrs.End(builder) |
| |
| builder.Finish(instrs) |
| buf = builder.Output() |
| self._output_file.write(len(buf).to_bytes(4, byteorder="little")) |
| self._output_file.write(buf) |
| |
| # MutableSequence has no clear function |
| del self._instrs_buf[0:] |
| |
| |
| def get_parser() -> argparse.ArgumentParser: |
| """Return a command line parser.""" |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| |
| parser.add_argument("--cycles", |
| type=int, |
| help="Maximum length of trace", |
| metavar="N") |
| |
| parser.add_argument("--outfile", |
| default="out.trace", |
| help="Output file.", |
| metavar="OFILE") |
| |
| parser.add_argument("--json", |
| action="store_true", |
| help="Write the trace as a sequence of json objects" |
| " (instead of flat-buffers).") |
| |
| parser.add_argument("--output-buffer-size", |
| type=int, |
| default=100000, |
| help="For efficiency, a buffer in memory collects N" |
| " processed instructions, and write all of them to the" |
| " output together.", |
| metavar="N", |
| dest="output_buffer_size") |
| |
| # The -v flag is setup so that verbose holds the number of times the flag |
| # was used. This is the standard way to use -v, even though at the moment |
| # we have only two levels of verbosity: warning (the default, with no -v), |
| # and info. |
| parser.add_argument("--verbose", "-v", |
| default=0, |
| action="count", |
| help="Increase the verbosity level. By default only" |
| " errors and warnings will show. Use '-v' to also show" |
| " information messages.") |
| |
| parser.add_argument("input_file", |
| help="ELF file or log file.", |
| metavar="IFILE") |
| |
| return parser |
| |
| |
| def main(argv: Sequence[str]) -> int: |
| parser = get_parser() |
| args = parser.parse_args(argv) |
| |
| log_level = logging.WARNING |
| if args.verbose > 0: |
| log_level = logging.INFO |
| |
| utilities.logging_config(log_level) |
| |
| with open(args.input_file, "r", encoding="ascii") as input_file: |
| if args.json: |
| fmt = FileFormat.JSON |
| mode = "w" |
| encoding = "ascii" |
| else: |
| fmt = FileFormat.FLATBUFFERS |
| mode = "wb" |
| encoding = None |
| |
| with open(args.outfile, mode, encoding=encoding) as output_file: |
| gen = ElaborateTrace(input_file, output_file, fmt, |
| args.output_buffer_size, None) |
| with CallEvery(30, lambda: logger.info("processed %d instructions", |
| gen.instr_count)): |
| gen.run() |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(sys.argv[1:])) |