| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| from typing import Dict, Iterator, List, Optional, Tuple |
| |
| from .isa import OTBNInsn |
| from .state import OTBNState |
| from .stats import ExecutionStats |
| from .trace import Trace |
| |
| |
| # A dictionary that defines a function of the form "address -> from -> to". If |
| # PC is the current PC and cnt is the count for the innermost loop then |
| # warps[PC][cnt] = new_cnt means that we should warp the current count to |
| # new_cnt. |
| LoopWarps = Dict[int, Dict[int, int]] |
| |
| |
| class OTBNSim: |
| def __init__(self) -> None: |
| self.state = OTBNState() |
| self.program = [] # type: List[OTBNInsn] |
| self.loop_warps = {} # type: LoopWarps |
| self.stats = None # type: Optional[ExecutionStats] |
| self._execute_generator = None # type: Optional[Iterator[None]] |
| self._next_insn = None # type: Optional[OTBNInsn] |
| |
| def load_program(self, program: List[OTBNInsn]) -> None: |
| self.program = program.copy() |
| |
| def add_loop_warp(self, addr: int, from_cnt: int, to_cnt: int) -> None: |
| '''Add a new loop warp to the simulation''' |
| self.loop_warps.setdefault(addr, {})[from_cnt] = to_cnt |
| |
| def load_data(self, data: bytes) -> None: |
| self.state.dmem.load_le_words(data) |
| |
| def start(self, collect_stats: bool) -> None: |
| '''Prepare to start the execution. |
| |
| Use run() or step() to actually execute the program. |
| |
| ''' |
| self.stats = ExecutionStats(self.program) if collect_stats else None |
| self._execute_generator = None |
| self._next_insn = None |
| self.state.start() |
| |
| def _fetch(self, pc: int) -> OTBNInsn: |
| word_pc = pc >> 2 |
| if word_pc >= len(self.program): |
| raise RuntimeError('Trying to execute instruction at address ' |
| '{:#x}, but the program is only {:#x} ' |
| 'bytes ({} instructions) long. Since there ' |
| 'are no architectural contents of the ' |
| 'memory here, we have to stop.' |
| .format(pc, |
| 4 * len(self.program), |
| len(self.program))) |
| |
| return self.program[word_pc] |
| |
| def _on_stall(self, |
| verbose: bool, |
| fetch_next: bool) -> List[Trace]: |
| '''This is run on a stall cycle''' |
| changes = self.state.changes() |
| self.state.commit(sim_stalled=True) |
| if fetch_next: |
| self._next_insn = self._fetch(self.state.pc) |
| if self.stats is not None: |
| self.stats.record_stall() |
| if verbose: |
| self._print_trace(self.state.pc, '(stall)', changes) |
| return changes |
| |
| def _on_retire(self, |
| verbose: bool, |
| insn: OTBNInsn) -> List[Trace]: |
| '''This is run when an instruction completes''' |
| assert self._execute_generator is None |
| self.state.post_insn(self.loop_warps.get(self.state.pc, {})) |
| |
| if self.stats is not None: |
| self.stats.record_insn(insn, self.state) |
| |
| if self.state.pending_halt: |
| # We've reached the end of the run (either because of an ECALL |
| # instruction or an error). |
| self.state.stop() |
| |
| changes = self.state.changes() |
| |
| # Program counter before commit |
| pc_before = self.state.pc |
| self.state.commit(sim_stalled=False) |
| |
| # Fetch the next instruction |
| self._next_insn = self._fetch(self.state.pc) |
| |
| disasm = insn.disassemble(pc_before) |
| if verbose: |
| self._print_trace(pc_before, disasm, changes) |
| |
| return changes |
| |
| def step(self, verbose: bool) -> Tuple[Optional[OTBNInsn], List[Trace]]: |
| '''Run a single cycle. |
| |
| Returns the instruction, together with a list of the architectural |
| changes that have happened. If the model isn't currently running, |
| returns no instruction and no changes. |
| |
| ''' |
| if not self.state.running: |
| return (None, []) |
| |
| if self.state.non_insn_stall: |
| # Zero INSN_CNT the cycle after we are told to start (and every |
| # cycle after that until we start executing instructions, but that |
| # doesn't really matter) |
| changes = self._on_stall(verbose, fetch_next=False) |
| self.state.ext_regs.write('INSN_CNT', 0, True) |
| return (None, changes) |
| |
| insn = self._next_insn |
| if insn is None: |
| return (None, self._on_stall(verbose, fetch_next=True)) |
| |
| if self._execute_generator is None: |
| # This is the first cycle for an instruction. Run any setup for |
| # the state object and then start running the instruction |
| # itself. |
| self.state.pre_insn(insn.affects_control) |
| |
| # Either execute the instruction directly (if it is a |
| # single-cycle instruction without a `yield` in execute()), or |
| # return a generator for multi-cycle instructions. Note that |
| # this doesn't consume the first yielded value. |
| self._execute_generator = insn.execute(self.state) |
| |
| if self._execute_generator is not None: |
| # This is a cycle for a multi-cycle instruction (which possibly |
| # started just above) |
| try: |
| next(self._execute_generator) |
| except StopIteration: |
| self._execute_generator = None |
| |
| sim_stalled = (self._execute_generator is not None) |
| if not sim_stalled: |
| return (insn, self._on_retire(verbose, insn)) |
| |
| return (None, self._on_stall(verbose, fetch_next=True)) |
| |
| def dump_data(self) -> bytes: |
| return self.state.dmem.dump_le_words() |
| |
| def _print_trace(self, pc: int, disasm: str, changes: List[Trace]) -> None: |
| '''Print a trace of the current instruction''' |
| changes_str = ', '.join([t.trace() for t in changes]) |
| print('{:08x} | {:45} | [{}]'.format(pc, disasm, changes_str)) |