| """Trace module.""" |
| |
| from typing import Any, IO, Optional |
| |
| # Generated by `flatc`. |
| import FBInstruction.Instructions as FBInstrs |
| from instruction import Instruction |
| import tbm_options |
| from utilities import FileFormat |
| |
| class FunctionalTrace: |
| """Class representing a functional simulator trace.""" |
| |
| def __init__(self, input_file: IO[Any], input_format: FileFormat, |
| instructions_range: str) -> None: |
| self.input_file = input_file |
| |
| self.input_format = input_format |
| if input_format == FileFormat.JSON: |
| self.read_instructions = self.read_json_instructions |
| else: |
| assert input_format == FileFormat.FLATBUFFERS |
| self.read_instructions = self.read_fb_instructions |
| |
| start, _, end = instructions_range.partition(":") |
| self.end = int(end) if end else None |
| |
| self.instr_count = 0 |
| self.instrs = [] |
| self.read_instructions() |
| self.skip(int(start)) |
| |
| @classmethod |
| def from_json(cls, input_file: IO[Any], instructions_range: str): |
| return cls(input_file, FileFormat.JSON, instructions_range) |
| |
| @classmethod |
| def from_fb(cls, input_file: IO[Any], instructions_range: str): |
| return cls(input_file, FileFormat.FLATBUFFERS, instructions_range) |
| |
| def next_addr(self) -> Optional[int]: |
| return self.instrs[-1].addr if self.instrs else None |
| |
| def dequeue(self) -> Optional[Instruction]: |
| if self.eof(): |
| return None |
| |
| return_val = self.instrs.pop() |
| if not self.instrs: |
| self.read_instructions() |
| self.instr_count += 1 |
| |
| return return_val |
| |
| def skip(self, n: int) -> None: |
| if self.eof() or n <= 0: |
| return |
| |
| if len(self.instrs) >= n: |
| self.instr_count += n |
| self.instrs = self.instrs[:-n] |
| if not self.instrs: |
| self.read_instructions() |
| return |
| |
| n -= len(self.instrs) |
| self.instr_count += len(self.instrs) |
| self.instrs.clear() |
| |
| if self.input_format == FileFormat.JSON: |
| for _ in range(n): |
| self.input_file.readline() |
| self.instr_count += n |
| self.read_instructions() |
| return |
| |
| assert self.input_format == FileFormat.FLATBUFFERS |
| |
| while len(self.instrs) < n: |
| n -= len(self.instrs) |
| self.instr_count += len(self.instrs) |
| self.instrs.clear() |
| # TODO(sflur): self.read_fb_instructions() constructs the |
| # instructions but we don't use most of them. |
| self.read_fb_instructions() |
| if not self.instrs: |
| return |
| |
| if n > 0: |
| self.instr_count += n |
| self.instrs = self.instrs[:-n] |
| |
| def eof(self) -> bool: |
| return (not self.instrs or |
| (self.end is not None and self.instr_count >= self.end)) |
| |
| def read_json_instructions(self) -> None: |
| instrs = [] |
| for _ in range(tbm_options.args.json_trace_buffer_size): |
| line = self.input_file.readline() |
| if line: |
| instrs.append(Instruction.from_json(line)) |
| else: |
| break |
| instrs.reverse() |
| self.instrs = instrs |
| |
| def read_fb_instructions(self) -> None: |
| length = self.input_file.read(4) |
| if not length: |
| return |
| |
| length = int.from_bytes(length, byteorder="little") |
| |
| buf = bytearray(self.input_file.read(length)) |
| instrs = FBInstrs.Instructions.GetRootAsInstructions(buf, 0) |
| self.instrs = [ |
| Instruction.from_fb(instrs.Instructions(i)) |
| for i in reversed(range(instrs.InstructionsLength())) |
| ] |