blob: 7cea9661d4346b6b5bc7c1ee0bb20a3d80561fc0 [file] [log] [blame]
#! /usr/bin/env python3
"""Elaborate a Spike trace for TBM."""
import argparse
import logging
import re
import sys
from typing import IO, Optional, Sequence
import flatbuffers
# Generated by `flatc`.
import FBInstruction.Instructions as FBInstrs
import disassembler
from instruction import Instruction
import utilities
from utilities import CallEvery
from utilities import FileFormat
logger = logging.getLogger("gentrace-spike")
class ElaborateTrace:
"""Elaborate a trace from a spike log file."""
def __init__(self, input_file: IO, output_file: IO,
output_format: FileFormat,
output_buffer_size: int,
functions: Optional[Sequence[Sequence[int]]]) -> None:
"""Init.
Args:
input_file: an open log file
output_file: an open output file
output_format: generate json or flatbuffers output.
functions: optional list of PC ranges to include in the output.
"""
self._input_file = input_file
self._output_file = output_file
self._output_format = output_format
self._functions = functions
# The current instruction being processed.
# See the curr_instr @property below.
self._curr_instr = None
# Buffer instructions before writing them to the output file.
self._instrs_buf = []
self._output_buffer_size = output_buffer_size
# The number of instructions included in the output trace (some might
# be buffered).
self.instr_count = 0
# If `discard_until` is set to some int, instructions from the trace
# are discarded until an instruction from address `discard_until` is
# read from the trace.
self._discard_until = None
# Precompile some REs:
self._rinstr = re.compile(
# pylint: disable-next=line-too-long
r"^core\s+(\d+):\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(\S*)\s+(.*)$"
)
self._rstate = re.compile(
# pylint: disable-next=line-too-long
r"^core\s+(\d+):\s+3\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(.*)$"
)
self._rexception = re.compile(r"^core\s+(\d+):\s+exception\s+(.*)$")
self._rcsr = re.compile(r"c\d+_")
self._rvloadwhole = re.compile(r"vl(\d)re(\d+).v")
self._rvstorewhole = re.compile(r"vs(\d)r.v")
@property
def curr_instr(self) -> Instruction:
return self._curr_instr
@curr_instr.setter
def curr_instr(self, instr: Instruction) -> None:
self.instr_count += 1
if self._curr_instr:
if instr.addr != self._curr_instr.addr + 4:
self._curr_instr.branch_target = instr.addr
self.clear_curr_instr()
self._curr_instr = instr
def clear_curr_instr(self) -> None:
if self._curr_instr:
self._instrs_buf.append(self._curr_instr)
if len(self._instrs_buf) == self._output_buffer_size:
self.write_to_file()
self._curr_instr = None
def run(self) -> None:
for line in self._input_file:
if self.try_instruction(line):
continue
if self._discard_until:
continue
if self.try_state(line):
continue
if self.try_exception(line):
continue
# Flush out the instructions buffer.
self.clear_curr_instr()
if self._instrs_buf:
self.write_to_file()
def try_instruction(self, line: str) -> bool:
"""Parse the first line of instruction execution."""
m = self._rinstr.match(line)
if m:
addr = int(m.group(2), 16)
if self._discard_until:
if addr == self._discard_until:
# We reached the desired location, stop discarding
# instructions.
self._discard_until = None
else:
# Discard this instruction
return True
if self._functions is not None and all(
addr not in r for r in self._functions):
# This instruction is not to be included in the output.
self.clear_curr_instr()
return True
opcode = int(m.group(3), 16)
mnemonic = m.group(4)
ops = m.group(5).split(", ") if m.group(5) else []
(inputs, outputs) = disassembler.asm_registers(mnemonic, ops)
new_instr = Instruction(addr=addr,
opcode=opcode,
mnemonic=mnemonic,
operands=ops,
inputs=inputs,
outputs=outputs,
is_nop=disassembler.is_nop(mnemonic),
is_branch=disassembler.is_branch(mnemonic),
branch_target=None,
is_flush=disassembler.is_flush(mnemonic),
is_vctrl=disassembler.is_vctrl(mnemonic),
loads=[],
stores=[],
lmul=None,
sew=None,
vl=None)
self.curr_instr = new_instr
return True
return False
def try_state(self, line: str) -> bool:
"""Parse the state accesses of the instruction."""
m = self._rstate.match(line)
if m:
if self.curr_instr is None:
return True
changes = m.group(4)
# Apply substitutions to convert non-architectural names for CSRs
# such as "c8_vstart" to architectural name "vstart".
changes = self._rcsr.sub(r"", changes)
# Most entries are simple pairs of "<register> <new value>"
# but there are exceptions such as vtype changes, memory writes
# (triples that include the value written), etc.
# So we parse the entries one at a time
changes = changes.split()
i = 0
while i < len(changes):
r = changes[i]
if r == "mem":
# memory access
if (len(changes) > i + 2 and
changes[i + 2].startswith("0x")):
# memory write
addr = int(changes[i + 1], 16)
self.curr_instr.stores.append(addr)
i += 3
else:
# memory read
addr = int(changes[i + 1], 16)
self.curr_instr.loads.append(addr)
i += 2
elif r in ["m1", "m2", "m4", "m8"]:
mv = self._rvloadwhole.match(self.curr_instr.mnemonic)
if mv:
# load whole vector register
self.curr_instr.lmul = int(mv.group(1))
else:
mv = self._rvstorewhole.match(self.curr_instr.mnemonic)
if mv:
# store whole vector register
self.curr_instr.lmul = int(mv.group(1))
else:
# vector LMUL
self.curr_instr.lmul = int(r[1])
i += 1
elif r in ["mf8", "mf4", "mf2"]:
# vector fractional LMUL
self.curr_instr.lmul = 1 / int(r[2])
i += 1
elif r in ["e8", "e16", "e32", "e64"]:
# vector element size
self.curr_instr.sew = int(r[1:])
i += 1
elif (r.startswith("x") or r.startswith("v") or
r.startswith("f") or r in disassembler.CSRS):
# register write
# TODO(sflur): collect those as outputs
i += 2
elif r.startswith("l"):
# vector length
self.curr_instr.vl = int(r[1:])
i += 1
else:
# There shouldn't be anything we didn't expect.
assert False
return True
return False
def try_exception(self, line: str) -> bool:
"""Parse exceptions."""
m = self._rexception.match(line)
if m:
if self.curr_instr is None:
return True
# We don't want to process the exception handling.
# TODO(sflur): there might be exceptions we do want to process?
# Note that some of the exceptions are artifacts of Spike, e.g.
# `write_tohost`.
# TODO(sflur): instead of using `addr` we can extract epc from
# `line` (note that epc points to the ecall instruction, and not to
# the next instruction).
self._discard_until = self.curr_instr.addr + 4
self.clear_curr_instr()
return True
return False
def write_to_file(self) -> None:
if self._output_format == FileFormat.JSON:
instrs = [i.to_json() for i in self._instrs_buf]
print("\n".join(instrs), file=self._output_file)
else:
assert self._output_format == FileFormat.FLATBUFFERS
builder = flatbuffers.Builder()
instrs = [i.fb_build(builder) for i in self._instrs_buf]
FBInstrs.StartInstructionsVector(builder, len(instrs))
for x in reversed(instrs):
builder.PrependUOffsetTRelative(x)
instrs = builder.EndVector()
FBInstrs.Start(builder)
FBInstrs.AddInstructions(builder, instrs)
instrs = FBInstrs.End(builder)
builder.Finish(instrs)
buf = builder.Output()
self._output_file.write(len(buf).to_bytes(4, byteorder="little"))
self._output_file.write(buf)
# MutableSequence has no clear function
del self._instrs_buf[0:]
def get_parser() -> argparse.ArgumentParser:
"""Return a command line parser."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--cycles",
type=int,
help="Maximum length of trace",
metavar="N")
parser.add_argument("--outfile",
default="out.trace",
help="Output file.",
metavar="OFILE")
parser.add_argument("--json",
action="store_true",
help="Write the trace as a sequence of json objects"
" (instead of flat-buffers).")
parser.add_argument("--output-buffer-size",
type=int,
default=100000,
help="For efficiency, a buffer in memory collects N"
" processed instructions, and write all of them to the"
" output together.",
metavar="N",
dest="output_buffer_size")
# The -v flag is setup so that verbose holds the number of times the flag
# was used. This is the standard way to use -v, even though at the moment
# we have only two levels of verbosity: warning (the default, with no -v),
# and info.
parser.add_argument("--verbose", "-v",
default=0,
action="count",
help="Increase the verbosity level. By default only"
" errors and warnings will show. Use '-v' to also show"
" information messages.")
parser.add_argument("input_file",
help="ELF file or log file.",
metavar="IFILE")
return parser
def main(argv: Sequence[str]) -> int:
parser = get_parser()
args = parser.parse_args(argv)
log_level = logging.WARNING
if args.verbose > 0:
log_level = logging.INFO
utilities.logging_config(log_level)
with open(args.input_file, "r", encoding="ascii") as input_file:
if args.json:
fmt = FileFormat.JSON
mode = "w"
encoding = "ascii"
else:
fmt = FileFormat.FLATBUFFERS
mode = "wb"
encoding = None
with open(args.outfile, mode, encoding=encoding) as output_file:
gen = ElaborateTrace(input_file, output_file, fmt,
args.output_buffer_size, None)
with CallEvery(30, lambda: logger.info("processed %d instructions",
gen.instr_count)):
gen.run()
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))