gentrace-spike: elaborates Spike traces
Elaborates Spike traces with all the information TBM needs. The disassembler
part is a bit of a hack. It should be possible to extend Spike to output the
registers each instruction reads, instead of just the registers they write (as
it does for memory accesses) and then the disassembler will not be required.
Change-Id: I427d87aad7725903c93b5b0fab8b0bb22866b458
diff --git a/disassembler.py b/disassembler.py
new file mode 100644
index 0000000..7e0d8f2
--- /dev/null
+++ b/disassembler.py
@@ -0,0 +1,385 @@
+import re
+from typing import Optional, Sequence, Tuple
+
+# Precompile some regular expressions
+RE_RVV_STORE = re.compile(r"(vse|vsuxei|vsse|vsoxei)\d+")
+RE_ADDR_OFF1 = re.compile(r"^\d*\((\w+)\)$")
+RE_ADDR_OFF2 = re.compile(r"^(\w+)\s*[-+]\s*(\d+|0x[0-9a-fA-F]+)$")
+RE_IMM = re.compile(r"^(-?\d+|0x[0-9a-fA-F]+)$")
+
+
+def asm_registers(
+ mnemonic: str,
+ operands: Sequence[str]) -> Tuple[Sequence[str], Sequence[str]]:
+ """Generate list of inputs and output registers from operands.
+
+ Args:
+ mnemonic: assembly instruction mnemonic
+ operands: operands
+
+ Returns:
+ inputs: input registers
+ outputs: output registers
+
+ TODO(sflur): The implementation is very incomplete. Instead of parsing the
+ disassembled instruction, which is too much work, the functional simulator
+ should provide the set of registers it read from (input) and wrote to
+ (output), for every instruction. Spike only provides the register writes!
+ Alterantively, use an external library to parse the instruction (machine
+ code or disassembled).
+ """
+
+ if (mnemonic in ["sb", "sh", "sw", "sbu", "shu", "fsw", "fsd"] or
+ RE_RVV_STORE.match(mnemonic)):
+ # store
+ input_ops = operands
+ output_ops = []
+ elif (mnemonic in ["j", "jr", "c.j"] or mnemonic.startswith("b")):
+ # jump/branch
+ input_ops = operands
+ output_ops = []
+ elif (mnemonic in ["jal", "jalr"] and len(operands) == 1):
+ # pseudo-instructions
+ input_ops = operands
+ output_ops = ["x1"]
+ else:
+ # default behaviour: first operand is destination, remainder are outputs
+ input_ops = operands[1:]
+ output_ops = operands[:1]
+
+ inputs = [r for r in [input_reg(o) for o in input_ops] if r]
+ outputs = [o for o in output_ops if not o[0].isdigit()]
+
+ # Add implicit inputs and outputs of instructions
+ if mnemonic.startswith("vset"):
+ outputs.extend(["vtype", "vl"])
+ elif mnemonic.startswith("v"):
+ inputs.extend(["vtype", "vl", "vstart"])
+
+ return (normalize(inputs), normalize(outputs))
+
+
+def input_reg(operand: str) -> Optional[str]:
+ """Extract a register from an input operand.
+
+ Discards immediate operands, branch address, etc.
+
+ Args:
+ operand: input operand
+
+ Returns:
+ register or None
+ """
+
+ # address offset
+ m = RE_ADDR_OFF1.match(operand)
+ if m:
+ return m.group(1)
+
+ # address offset
+ m = RE_ADDR_OFF2.match(operand)
+ if m:
+ return m.group(1)
+
+ # discard immediate operand
+ if RE_IMM.match(operand):
+ return None
+
+ # default: it's probably a register
+ return operand
+
+
+# table of RISC-V ABI names
+ABI_NAMES = {
+ "zero": "x0",
+ "ra": "x1",
+ "sp": "x2",
+ "gp": "x3",
+ "tp": "x4",
+ "t0": "x5",
+ "t1": "x6",
+ "t2": "x7",
+ "s0": "x8",
+ "s1": "x9",
+ "a0": "x10",
+ "a1": "x11",
+ "a2": "x12",
+ "a3": "x13",
+ "a4": "x14",
+ "a5": "x15",
+ "a6": "x16",
+ "a7": "x17",
+ "s2": "x18",
+ "s3": "x19",
+ "s4": "x20",
+ "s5": "x21",
+ "s6": "x22",
+ "s7": "x23",
+ "s8": "x24",
+ "s9": "x25",
+ "s10": "x26",
+ "s11": "x27",
+ "t3": "x28",
+ "t4": "x29",
+ "t5": "x30",
+ "t6": "x31",
+
+ # This is the RVV mask register (not exactly abi).
+ "v0.t": "v0",
+}
+
+# list of non-register names
+BOGUS_REGISTERS = {
+ "x0",
+ "e8",
+ "e16",
+ "e32",
+ "e64",
+ "e128",
+ "m1",
+ "m2",
+ "m4",
+ "m8",
+ "m16",
+ "ta",
+ "tu",
+ "ma",
+ "mu",
+}
+
+
+def normalize(rs: Sequence[str]) -> Sequence[str]:
+ """Replace ABI register names with their architectural names (removing x0).
+
+ Also, removes duplicates.
+
+ Args:
+ rs: list of registers.
+ Returns:
+ list of registers
+ """
+ return list({ABI_NAMES.get(r, r) for r in rs} - BOGUS_REGISTERS)
+
+
+NOPS = {
+ "nop",
+ "c.nop",
+ "fence",
+ "fence.i",
+ "sfence.vma",
+ "wfi",
+}
+
+
+def is_nop(mnemonic):
+ """Test whether an instruction mnemonic is a NOP in TBM sense.
+
+ A NOP instruction is any instruction that TBM will retire without placing
+ in a dispatch queue.
+ """
+ return mnemonic in NOPS
+
+
+# List of all known branch instructions
+BRANCHES = {
+ "beq",
+ "bne",
+ "blt",
+ "bge",
+ "bltu",
+ "bgeu",
+ "jal",
+ "jalr",
+ "bnez",
+ "beqz",
+ "blez",
+ "bgez",
+ "bltz",
+ "bgtz",
+ "bleu",
+ "bgtu",
+ "j",
+ "c.j",
+ "jr",
+ "ret",
+ "sret",
+ "mret",
+ "ecall",
+ "ebreak",
+}
+
+
+def is_branch(mnemonic: str) -> bool:
+ """Test whether an instruction mnemonic is a branch."""
+ return mnemonic in BRANCHES
+
+
+FLUSHES = {
+ "csrr",
+ "csrw",
+ "csrs",
+ "csrwi",
+ "csrrw",
+ "csrrs",
+ "csrrc",
+ "csrrwi",
+ "csrrsi",
+ "csrrci",
+ "fence",
+ "fence.i",
+ "sfence.vma",
+}
+
+
+def is_flush(mnemonic: str) -> bool:
+ """Test whether an instruction mnemonic is a flush in TBM sense.
+
+ A flush instruction is any instruction that should be placed in a dispatch
+ queue (or retired, see is_nop) only when the pipeline is empty.
+ """
+ return mnemonic in FLUSHES
+
+
+VCTRL = {
+ "vsetivli",
+ "vsetvli",
+ "vsetvl",
+}
+
+
+def is_vctrl(mnemonic: str) -> bool:
+ """Test whether an instruction mnemonic is a vctrl."""
+ return mnemonic in VCTRL
+
+
+# List of control/status registers (incomplete)
+CSRS = {
+ "cycle",
+ "cycleh",
+ "dcsr",
+ "dpc",
+ "dscratch0",
+ "dscratch1",
+ "fcsr",
+ "fflags",
+ "frm",
+ "hcounteren",
+ "hedeleg",
+ "hgatp",
+ "hgeie",
+ "hgeip",
+ "hideleg",
+ "hie",
+ "hip",
+ "hstatus",
+ "htimedelta",
+ "htimedeltah",
+ "htinst",
+ "htval",
+ "hvip",
+ "instret",
+ "instreth",
+ "marchid",
+ "mcause",
+ "mcontext",
+ "mcounteren",
+ "mcountinhibit",
+ "mcycle",
+ "medeleg",
+ "mepc",
+ "mhartid",
+ "mideleg",
+ "mie",
+ "mimpid",
+ "minstret",
+ "mintstatus",
+ "mip",
+ "misa",
+ "mnxti",
+ "mscratch",
+ "mscratchcsw",
+ "mscratchcswl",
+ "mstatus",
+ "mtinst",
+ "mtval",
+ "mtval2",
+ "mtvec",
+ "mtvt",
+ "mvendorid",
+ "pmpaddr0",
+ "pmpaddr1",
+ "pmpaddr10",
+ "pmpaddr11",
+ "pmpaddr12",
+ "pmpaddr13",
+ "pmpaddr14",
+ "pmpaddr15",
+ "pmpaddr2",
+ "pmpaddr3",
+ "pmpaddr4",
+ "pmpaddr5",
+ "pmpaddr6",
+ "pmpaddr7",
+ "pmpaddr8",
+ "pmpaddr9",
+ "pmpcfg0",
+ "pmpcfg1",
+ "pmpcfg2",
+ "pmpcfg3",
+ "satp",
+ "scause",
+ "scontext",
+ "scounteren",
+ "sedeleg",
+ "sentropy",
+ "sepc",
+ "sideleg",
+ "sie",
+ "sintstatus",
+ "sip",
+ "snxti",
+ "sscratch",
+ "sscratchcsw",
+ "sscratchcswl",
+ "sstatus",
+ "stval",
+ "stvec",
+ "stvt",
+ "tcontrol",
+ "tdata1",
+ "tdata2",
+ "tdata3",
+ "time",
+ "timeh",
+ "tinfo",
+ "tselect",
+ "ucause",
+ "uepc",
+ "uie",
+ "uintstatus",
+ "uip",
+ "unxti",
+ "uscratch",
+ "uscratchcsw",
+ "uscratchcswl",
+ "ustatus",
+ "utval",
+ "utvec",
+ "utvt",
+ "vcsr",
+ "vl",
+ "vlenb",
+ "vsatp",
+ "vscause",
+ "vsepc",
+ "vsie",
+ "vsip",
+ "vsscratch",
+ "vsstatus",
+ "vstart",
+ "vstval",
+ "vstvec",
+ "vtype",
+ "vxrm",
+ "vxsat",
+}
diff --git a/gentrace-spike.py b/gentrace-spike.py
new file mode 100755
index 0000000..7cea966
--- /dev/null
+++ b/gentrace-spike.py
@@ -0,0 +1,377 @@
+#! /usr/bin/env python3
+
+"""Elaborate a Spike trace for TBM."""
+
+import argparse
+import logging
+import re
+import sys
+from typing import IO, Optional, Sequence
+
+import flatbuffers
+
+# Generated by `flatc`.
+import FBInstruction.Instructions as FBInstrs
+
+import disassembler
+from instruction import Instruction
+import utilities
+from utilities import CallEvery
+from utilities import FileFormat
+
+
+logger = logging.getLogger("gentrace-spike")
+
+
+class ElaborateTrace:
+ """Elaborate a trace from a spike log file."""
+
+ def __init__(self, input_file: IO, output_file: IO,
+ output_format: FileFormat,
+ output_buffer_size: int,
+ functions: Optional[Sequence[Sequence[int]]]) -> None:
+ """Init.
+
+ Args:
+ input_file: an open log file
+ output_file: an open output file
+ output_format: generate json or flatbuffers output.
+ functions: optional list of PC ranges to include in the output.
+ """
+ self._input_file = input_file
+ self._output_file = output_file
+ self._output_format = output_format
+ self._functions = functions
+
+ # The current instruction being processed.
+ # See the curr_instr @property below.
+ self._curr_instr = None
+
+ # Buffer instructions before writing them to the output file.
+ self._instrs_buf = []
+ self._output_buffer_size = output_buffer_size
+
+ # The number of instructions included in the output trace (some might
+ # be buffered).
+ self.instr_count = 0
+
+ # If `discard_until` is set to some int, instructions from the trace
+ # are discarded until an instruction from address `discard_until` is
+ # read from the trace.
+ self._discard_until = None
+
+ # Precompile some REs:
+ self._rinstr = re.compile(
+ # pylint: disable-next=line-too-long
+ r"^core\s+(\d+):\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(\S*)\s+(.*)$"
+ )
+ self._rstate = re.compile(
+ # pylint: disable-next=line-too-long
+ r"^core\s+(\d+):\s+3\s+0x([0-9a-fA-F]+)\s+\(0x([0-9a-fA-F]+)\)\s*(.*)$"
+ )
+ self._rexception = re.compile(r"^core\s+(\d+):\s+exception\s+(.*)$")
+ self._rcsr = re.compile(r"c\d+_")
+
+ self._rvloadwhole = re.compile(r"vl(\d)re(\d+).v")
+ self._rvstorewhole = re.compile(r"vs(\d)r.v")
+
+ @property
+ def curr_instr(self) -> Instruction:
+ return self._curr_instr
+
+ @curr_instr.setter
+ def curr_instr(self, instr: Instruction) -> None:
+ self.instr_count += 1
+
+ if self._curr_instr:
+ if instr.addr != self._curr_instr.addr + 4:
+ self._curr_instr.branch_target = instr.addr
+
+ self.clear_curr_instr()
+
+ self._curr_instr = instr
+
+ def clear_curr_instr(self) -> None:
+ if self._curr_instr:
+ self._instrs_buf.append(self._curr_instr)
+
+ if len(self._instrs_buf) == self._output_buffer_size:
+ self.write_to_file()
+
+ self._curr_instr = None
+
+
+ def run(self) -> None:
+ for line in self._input_file:
+ if self.try_instruction(line):
+ continue
+
+ if self._discard_until:
+ continue
+
+ if self.try_state(line):
+ continue
+
+ if self.try_exception(line):
+ continue
+
+ # Flush out the instructions buffer.
+ self.clear_curr_instr()
+ if self._instrs_buf:
+ self.write_to_file()
+
+ def try_instruction(self, line: str) -> bool:
+ """Parse the first line of instruction execution."""
+ m = self._rinstr.match(line)
+ if m:
+ addr = int(m.group(2), 16)
+
+ if self._discard_until:
+ if addr == self._discard_until:
+ # We reached the desired location, stop discarding
+ # instructions.
+ self._discard_until = None
+ else:
+ # Discard this instruction
+ return True
+
+ if self._functions is not None and all(
+ addr not in r for r in self._functions):
+ # This instruction is not to be included in the output.
+ self.clear_curr_instr()
+ return True
+
+ opcode = int(m.group(3), 16)
+ mnemonic = m.group(4)
+ ops = m.group(5).split(", ") if m.group(5) else []
+ (inputs, outputs) = disassembler.asm_registers(mnemonic, ops)
+ new_instr = Instruction(addr=addr,
+ opcode=opcode,
+ mnemonic=mnemonic,
+ operands=ops,
+ inputs=inputs,
+ outputs=outputs,
+ is_nop=disassembler.is_nop(mnemonic),
+ is_branch=disassembler.is_branch(mnemonic),
+ branch_target=None,
+ is_flush=disassembler.is_flush(mnemonic),
+ is_vctrl=disassembler.is_vctrl(mnemonic),
+ loads=[],
+ stores=[],
+ lmul=None,
+ sew=None,
+ vl=None)
+ self.curr_instr = new_instr
+ return True
+
+ return False
+
+ def try_state(self, line: str) -> bool:
+ """Parse the state accesses of the instruction."""
+ m = self._rstate.match(line)
+ if m:
+ if self.curr_instr is None:
+ return True
+
+ changes = m.group(4)
+
+ # Apply substitutions to convert non-architectural names for CSRs
+ # such as "c8_vstart" to architectural name "vstart".
+ changes = self._rcsr.sub(r"", changes)
+
+ # Most entries are simple pairs of "<register> <new value>"
+ # but there are exceptions such as vtype changes, memory writes
+ # (triples that include the value written), etc.
+ # So we parse the entries one at a time
+ changes = changes.split()
+ i = 0
+ while i < len(changes):
+ r = changes[i]
+
+ if r == "mem":
+ # memory access
+ if (len(changes) > i + 2 and
+ changes[i + 2].startswith("0x")):
+ # memory write
+ addr = int(changes[i + 1], 16)
+ self.curr_instr.stores.append(addr)
+ i += 3
+ else:
+ # memory read
+ addr = int(changes[i + 1], 16)
+ self.curr_instr.loads.append(addr)
+ i += 2
+
+ elif r in ["m1", "m2", "m4", "m8"]:
+ mv = self._rvloadwhole.match(self.curr_instr.mnemonic)
+ if mv:
+ # load whole vector register
+ self.curr_instr.lmul = int(mv.group(1))
+ else:
+ mv = self._rvstorewhole.match(self.curr_instr.mnemonic)
+ if mv:
+ # store whole vector register
+ self.curr_instr.lmul = int(mv.group(1))
+ else:
+ # vector LMUL
+ self.curr_instr.lmul = int(r[1])
+ i += 1
+
+ elif r in ["mf8", "mf4", "mf2"]:
+ # vector fractional LMUL
+ self.curr_instr.lmul = 1 / int(r[2])
+ i += 1
+
+ elif r in ["e8", "e16", "e32", "e64"]:
+ # vector element size
+ self.curr_instr.sew = int(r[1:])
+ i += 1
+
+ elif (r.startswith("x") or r.startswith("v") or
+ r.startswith("f") or r in disassembler.CSRS):
+ # register write
+ # TODO(sflur): collect those as outputs
+ i += 2
+
+ elif r.startswith("l"):
+ # vector length
+ self.curr_instr.vl = int(r[1:])
+ i += 1
+
+ else:
+ # There shouldn't be anything we didn't expect.
+ assert False
+
+ return True
+
+ return False
+
+ def try_exception(self, line: str) -> bool:
+ """Parse exceptions."""
+ m = self._rexception.match(line)
+ if m:
+ if self.curr_instr is None:
+ return True
+
+ # We don't want to process the exception handling.
+ # TODO(sflur): there might be exceptions we do want to process?
+ # Note that some of the exceptions are artifacts of Spike, e.g.
+ # `write_tohost`.
+ # TODO(sflur): instead of using `addr` we can extract epc from
+ # `line` (note that epc points to the ecall instruction, and not to
+ # the next instruction).
+ self._discard_until = self.curr_instr.addr + 4
+ self.clear_curr_instr()
+ return True
+
+ return False
+
+ def write_to_file(self) -> None:
+ if self._output_format == FileFormat.JSON:
+ instrs = [i.to_json() for i in self._instrs_buf]
+ print("\n".join(instrs), file=self._output_file)
+
+ else:
+ assert self._output_format == FileFormat.FLATBUFFERS
+
+ builder = flatbuffers.Builder()
+ instrs = [i.fb_build(builder) for i in self._instrs_buf]
+
+ FBInstrs.StartInstructionsVector(builder, len(instrs))
+ for x in reversed(instrs):
+ builder.PrependUOffsetTRelative(x)
+ instrs = builder.EndVector()
+
+ FBInstrs.Start(builder)
+ FBInstrs.AddInstructions(builder, instrs)
+ instrs = FBInstrs.End(builder)
+
+ builder.Finish(instrs)
+ buf = builder.Output()
+ self._output_file.write(len(buf).to_bytes(4, byteorder="little"))
+ self._output_file.write(buf)
+
+ # MutableSequence has no clear function
+ del self._instrs_buf[0:]
+
+
+def get_parser() -> argparse.ArgumentParser:
+ """Return a command line parser."""
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+ parser.add_argument("--cycles",
+ type=int,
+ help="Maximum length of trace",
+ metavar="N")
+
+ parser.add_argument("--outfile",
+ default="out.trace",
+ help="Output file.",
+ metavar="OFILE")
+
+ parser.add_argument("--json",
+ action="store_true",
+ help="Write the trace as a sequence of json objects"
+ " (instead of flat-buffers).")
+
+ parser.add_argument("--output-buffer-size",
+ type=int,
+ default=100000,
+ help="For efficiency, a buffer in memory collects N"
+ " processed instructions, and write all of them to the"
+ " output together.",
+ metavar="N",
+ dest="output_buffer_size")
+
+ # The -v flag is setup so that verbose holds the number of times the flag
+ # was used. This is the standard way to use -v, even though at the moment
+ # we have only two levels of verbosity: warning (the default, with no -v),
+ # and info.
+ parser.add_argument("--verbose", "-v",
+ default=0,
+ action="count",
+ help="Increase the verbosity level. By default only"
+ " errors and warnings will show. Use '-v' to also show"
+ " information messages.")
+
+ parser.add_argument("input_file",
+ help="ELF file or log file.",
+ metavar="IFILE")
+
+ return parser
+
+
+def main(argv: Sequence[str]) -> int:
+ parser = get_parser()
+ args = parser.parse_args(argv)
+
+ log_level = logging.WARNING
+ if args.verbose > 0:
+ log_level = logging.INFO
+
+ utilities.logging_config(log_level)
+
+ with open(args.input_file, "r", encoding="ascii") as input_file:
+ if args.json:
+ fmt = FileFormat.JSON
+ mode = "w"
+ encoding = "ascii"
+ else:
+ fmt = FileFormat.FLATBUFFERS
+ mode = "wb"
+ encoding = None
+
+ with open(args.outfile, mode, encoding=encoding) as output_file:
+ gen = ElaborateTrace(input_file, output_file, fmt,
+ args.output_buffer_size, None)
+ with CallEvery(30, lambda: logger.info("processed %d instructions",
+ gen.instr_count)):
+ gen.run()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv[1:]))
diff --git a/instruction.fbs b/instruction.fbs
new file mode 100644
index 0000000..3a4aa35
--- /dev/null
+++ b/instruction.fbs
@@ -0,0 +1,24 @@
+namespace FBInstruction;
+
+table Instruction {
+ addr : uint64;
+ opcode: uint32;
+ mnemonic: string;
+ operands: [string];
+ inputs: [string];
+ outputs: [string];
+ is_nop: bool;
+ is_branch: bool;
+ branch_target: uint64; // 0 means not set
+ is_flush: bool;
+ is_vctrl: bool;
+ loads: [uint64];
+ stores: [uint64];
+ lmul: float32; // valid values: 1/8, 1/4, 1/2, 1, 2, 4, 8, and 0 which means not set
+ sew: uint8; // 0 means not set
+ vl: int16; // -1 means not set
+}
+
+table Instructions { instructions:[Instruction]; }
+
+root_type Instructions;
diff --git a/instruction.py b/instruction.py
new file mode 100644
index 0000000..5003f9d
--- /dev/null
+++ b/instruction.py
@@ -0,0 +1,296 @@
+"""Classes shared by trace generators and TBMs."""
+
+from __future__ import annotations
+
+import collections
+import dataclasses
+from dataclasses import dataclass
+import json
+import re
+from typing import Any, Dict, List, Optional, Sequence, Union
+
+# Generated by `flatc`.
+import FBInstruction.Instruction as FBInstr
+
+
+@dataclass(slots=True, kw_only=True)
+class Instruction:
+ """Class representing an instruction.
+
+ This is an architecture-neutral representation of an instruction.
+ Architecture specific code involving RISC-V, Arm, x86, etc. should go in
+ trace generation and disassembly.
+ """
+
+ addr: int
+ opcode: int
+ mnemonic: str
+ operands: Sequence[str]
+ inputs: Sequence[str]
+ outputs: Sequence[str]
+ is_nop: bool
+ is_branch: bool
+ branch_target: Optional[Sequence[int]]
+ is_flush: bool
+ is_vctrl: bool
+ loads: List[int]
+ stores: List[int]
+ lmul: Optional[Union[int, float]]
+ sew: Optional[int]
+ vl: Optional[int]
+
+ inputs_by_type_cache: Optional[Dict[str, Sequence[str]]] = None
+ outputs_by_type_cache: Optional[Dict[str, Sequence[str]]] = None
+
+ def __eq__(self, other) -> bool:
+ return id(self) == id(other)
+
+ def __hash__(self) -> int:
+ return id(self)
+
+ def to_json(self):
+ # For efficiency, and to match the FB:
+ inputs_by_type_cache = self.inputs_by_type_cache
+ self.inputs_by_type_cache = None
+ outputs_by_type_cache = self.outputs_by_type_cache
+ self.outputs_by_type_cache = None
+
+ res = json.dumps(dataclasses.asdict(self))
+
+ self.inputs_by_type_cache = inputs_by_type_cache
+ self.outputs_by_type_cache = outputs_by_type_cache
+
+ return res
+
+ @classmethod
+ def from_json(cls, s: str) -> Instruction:
+ """Parse JSON string to Instruction."""
+ d = json.loads(s)
+ return cls(**d)
+
+ def fb_build(self, builder) -> Any:
+ mnemonic = builder.CreateString(self.mnemonic)
+
+ operands = [builder.CreateString(x) for x in self.operands]
+ FBInstr.StartOperandsVector(builder, len(operands))
+ for x in reversed(operands):
+ builder.PrependUOffsetTRelative(x)
+ operands = builder.EndVector()
+
+ inputs = [builder.CreateString(x) for x in self.inputs]
+ FBInstr.StartInputsVector(builder, len(inputs))
+ for x in reversed(inputs):
+ builder.PrependUOffsetTRelative(x)
+ inputs = builder.EndVector()
+
+ outputs = [builder.CreateString(x) for x in self.outputs]
+ FBInstr.StartOutputsVector(builder, len(outputs))
+ for x in reversed(outputs):
+ builder.PrependUOffsetTRelative(x)
+ outputs = builder.EndVector()
+
+ FBInstr.StartLoadsVector(builder, len(self.loads))
+ for l in reversed(self.loads):
+ builder.PrependUint64(l)
+ loads = builder.EndVector()
+
+ FBInstr.StartStoresVector(builder, len(self.stores))
+ for s in reversed(self.stores):
+ builder.PrependUint64(s)
+ stores = builder.EndVector()
+
+ FBInstr.Start(builder)
+ FBInstr.AddAddr(builder, self.addr)
+ FBInstr.AddOpcode(builder, self.opcode)
+ FBInstr.AddMnemonic(builder, mnemonic)
+ FBInstr.AddOperands(builder, operands)
+ FBInstr.AddInputs(builder, inputs)
+ FBInstr.AddOutputs(builder, outputs)
+ FBInstr.AddIsNop(builder, self.is_nop)
+ FBInstr.AddIsBranch(builder, self.is_branch)
+ FBInstr.AddBranchTarget(
+ builder, self.branch_target if self.branch_target else 0)
+ FBInstr.AddIsFlush(builder, self.is_flush)
+ FBInstr.AddIsVctrl(builder, self.is_vctrl)
+ FBInstr.AddLoads(builder, loads)
+ FBInstr.AddStores(builder, stores)
+ FBInstr.AddLmul(builder,
+ float(self.lmul) if self.lmul else 0.0)
+ FBInstr.AddSew(builder, self.sew if self.sew else 0)
+ FBInstr.AddVl(builder, self.vl if self.vl else -1)
+ return FBInstr.End(builder)
+
+ @classmethod
+ def from_fb(cls, buf) -> Instruction:
+ operands = ([
+ buf.Operands(i).decode("utf-8") for i in range(buf.OperandsLength())
+ ] if not buf.OperandsIsNone() else [])
+
+ inputs = ([
+ buf.Inputs(i).decode("utf-8") for i in range(buf.InputsLength())
+ ] if not buf.InputsIsNone() else [])
+
+ outputs = ([
+ buf.Outputs(i).decode("utf-8") for i in range(buf.OutputsLength())
+ ] if not buf.OutputsIsNone() else [])
+
+ loads = ([buf.Loads(i) for i in range(buf.LoadsLength())]
+ if not buf.LoadsIsNone() else [])
+
+ stores = ([buf.Stores(i) for i in range(buf.StoresLength())]
+ if not buf.StoresIsNone() else [])
+
+ branch_target = buf.BranchTarget()
+ if branch_target == 0:
+ branch_target = None
+
+ lmul = buf.Lmul()
+ if lmul == 0.0:
+ lmul = None
+ elif lmul >= 1:
+ lmul = int(lmul)
+
+ return cls(addr=buf.Addr(),
+ opcode=buf.Opcode(),
+ mnemonic=buf.Mnemonic().decode("utf-8"),
+ operands=operands,
+ inputs=inputs,
+ outputs=outputs,
+ is_nop=buf.IsNop(),
+ is_branch=buf.IsBranch(),
+ branch_target=branch_target,
+ is_flush=buf.IsFlush(),
+ is_vctrl=buf.IsVctrl(),
+ loads=loads,
+ stores=stores,
+ lmul=lmul,
+ sew=buf.Sew(),
+ vl=buf.Vl())
+
+ def __str__(self) -> str:
+ return f"(0x{self.addr:x}) {self.mnemonic} {','.join(self.operands)}"
+
+ def max_emul(self) -> Union[int, float]:
+ """Compute the biggest emul of all the vector registers for the
+ instruction.
+
+ Returns:
+ biggest emul.
+ """
+
+ if self.lmul is not None:
+ if (self.mnemonic.startswith("vw") or
+ self.mnemonic.startswith("vfw") or
+ self.mnemonic.startswith("vn") or
+ self.mnemonic.startswith("vfn")):
+ return 2 * self.lmul
+
+ return self.lmul
+
+ return 1
+
+ def inputs_by_type(self) -> Dict[str, Sequence[str]]:
+ if self.inputs_by_type_cache is None:
+ self.inputs_by_type_cache = sort_regs_by_type(self.inputs)
+ return self.inputs_by_type_cache
+
+ def outputs_by_type(self) -> Dict[str, Sequence[str]]:
+ if self.outputs_by_type_cache is None:
+ self.outputs_by_type_cache = sort_regs_by_type(self.outputs)
+ return self.outputs_by_type_cache
+
+ def conflicts_with(self, other) -> bool:
+ """Are there RAW/WAR/WAW conflicts between the instructions?
+
+ NOTE: inputs and outputs don't include all the vector registers (e.g.
+ when LMUL > 1, vl8re8.v, vs8r.v), but this is not a problem here,
+ because only vector instructions access vector registers, and we don't
+ care if those are conflicting with each other, as they all go to the
+ same dispatch queue.
+
+ Args:
+ other: the other instruction to check conflicts with.
+
+ Returns:
+ True if the instructions have conflicts, otherwise False.
+ """
+
+ return (overlaps(self.inputs, other.outputs) or
+ overlaps(self.outputs, other.inputs) or
+ overlaps(self.outputs, other.outputs))
+
+
+def sort_regs_by_type(regs: Sequence[str]) -> Dict[str, Sequence[str]]:
+ res = {}
+ for reg in regs:
+ rf = register_type(reg)
+ res.setdefault(rf, collections.deque()).append(reg)
+ return res
+
+
+def overlaps(xs: Sequence[str], ys: Sequence[str]) -> bool:
+ return any(x in ys for x in xs)
+
+
+RE_VREG = re.compile(r"v\d+$")
+RE_FREG = re.compile(r"f\d+$")
+RE_XREG = re.compile(r"x\d+$")
+
+
+def is_vector_register(r: str) -> bool:
+ """Test whether a register is a vector register."""
+ # TODO(sflur): ideally this RISC-V specific code would be somewhere else
+ return bool(RE_VREG.match(r))
+
+
+def is_float_register(r: str) -> bool:
+ """Test whether a register is a floating point register."""
+ # TODO(sflur): ideally this RISC-V specific code would be somewhere else
+ return bool(RE_FREG.match(r))
+
+
+def is_int_register(r: str) -> bool:
+ """Test whether a register is a floating point register."""
+ # TODO(sflur): ideally this RISC-V specific code would be somewhere else
+ # note that this code assumes that ABI names like "ra" have been replaced
+ # with architectural names like "x1"
+ return bool(RE_XREG.match(r))
+
+
+def register_type(r: str) -> str:
+ """Assign a register to a register class.
+
+ Args:
+ r: register id
+
+ Returns:
+ - register file name
+
+ Classes are
+ - "V": vector RF
+ - "F": float RF
+ - "X": general purpose (integer) RF
+ - "MISC: everything else
+
+ The MISC class includes various status and control registers
+ and we might want to split it into finer divisions.
+ """
+
+ # TODO(sflur): instead of using hardcoded predicates this should be somehow
+ # configurable. There's already something in the uarch yaml file (see
+ # register_files), but I suspect using that will make things much slower. A
+ # posible solution is to hardcode some generic predicates (e.g. a
+ # letter+number) and configure those in the yaml. Also, I don't think a
+ # single config file should be used for gentrace and tbm, as gentrace
+ # config is more fixed.
+
+ if is_int_register(r):
+ return "X"
+
+ if is_vector_register(r):
+ return "V"
+
+ if is_float_register(r):
+ return "F"
+
+ return "MISC"