blob: 20bac52cc52fb9c4fcf5117658421c3d8679ad88 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
from typing import Dict, Iterator, List, Optional, Tuple
from .decode import EmptyInsn
from .isa import OTBNInsn
from .state import OTBNState, FsmState
from .stats import ExecutionStats
from .trace import Trace
from .ext_regs import TraceExtRegChange, ExtRegChange
# A dictionary that defines a function of the form "address -> from -> to". If
# PC is the current PC and cnt is the count for the innermost loop then
# warps[PC][cnt] = new_cnt means that we should warp the current count to
# new_cnt.
LoopWarps = Dict[int, Dict[int, int]]
class OTBNSim:
def __init__(self) -> None:
self.state = OTBNState()
self.program = [] # type: List[OTBNInsn]
self.loop_warps = {} # type: LoopWarps
self.stats = None # type: Optional[ExecutionStats]
self._execute_generator = None # type: Optional[Iterator[None]]
self._next_insn = None # type: Optional[OTBNInsn]
def load_program(self, program: List[OTBNInsn]) -> None:
self.program = program.copy()
def add_loop_warp(self, addr: int, from_cnt: int, to_cnt: int) -> None:
'''Add a new loop warp to the simulation'''
self.loop_warps.setdefault(addr, {})[from_cnt] = to_cnt
def load_data(self, data: bytes, has_validity: bool) -> None:
'''Load bytes into DMEM, starting at address zero.
If has_validity is true, each 32-bit word should be represented by 5
bytes (1 byte that says whether the word is valid, then 4 bytes that
give the word in little-endian format). If has_validity is false, each
word is considered valid and is represented by 4 bytes in little-endian
format.
'''
self.state.dmem.load_le_words(data, has_validity)
def start(self, collect_stats: bool) -> None:
'''Prepare to start the execution.
Use run() or step() to actually execute the program.
'''
self.stats = ExecutionStats(self.program) if collect_stats else None
self._execute_generator = None
self._next_insn = None
self.state.start()
def _fetch(self, pc: int) -> OTBNInsn:
word_pc = pc >> 2
if word_pc >= len(self.program):
raise RuntimeError('Trying to execute instruction at address '
'{:#x}, but the program is only {:#x} '
'bytes ({} instructions) long. Since there '
'are no architectural contents of the '
'memory here, we have to stop.'
.format(pc,
4 * len(self.program),
len(self.program)))
if self.state.invalidated_imem:
return EmptyInsn(self.state.pc)
return self.program[word_pc]
def _on_stall(self,
verbose: bool,
fetch_next: bool) -> List[Trace]:
'''This is run on a stall cycle'''
changes = self.state.changes()
self.state.commit(sim_stalled=True)
if fetch_next:
self._next_insn = self._fetch(self.state.pc)
if self.stats is not None:
self.stats.record_stall()
if verbose:
self._print_trace(self.state.pc, '(stall)', changes)
return changes
def _on_retire(self,
verbose: bool,
insn: OTBNInsn) -> List[Trace]:
'''This is run when an instruction completes'''
assert self._execute_generator is None
self.state.post_insn(self.loop_warps.get(self.state.pc, {}))
if self.stats is not None:
self.stats.record_insn(insn, self.state)
if self.state.pending_halt:
# We've reached the end of the run (either because of an ECALL
# instruction or an error).
self.state.stop()
changes = self.state.changes()
# Program counter before commit
pc_before = self.state.pc
self.state.commit(sim_stalled=False)
# Fetch the next instruction unless this instruction had
# `has_fetch_stall` set, in which case we inject a single cycle stall.
self._next_insn = (None if insn.has_fetch_stall
else self._fetch(self.state.pc))
disasm = insn.disassemble(pc_before)
if verbose:
self._print_trace(pc_before, disasm, changes)
return changes
def step(self, verbose: bool) -> Tuple[Optional[OTBNInsn], List[Trace]]:
'''Run a single cycle.
Returns the instruction, together with a list of the architectural
changes that have happened. If the model isn't currently running,
returns no instruction and no changes.
'''
if not self.state.running():
return (None, [])
if self.state.fsm_state in [FsmState.PRE_EXEC]:
# Zero INSN_CNT the cycle after we are told to start (and every
# cycle after that until we start executing instructions, but that
# doesn't really matter)
changes = self._on_stall(verbose, fetch_next=False)
changes += [TraceExtRegChange('RND_REQ', ExtRegChange('=', 0, True, 0))]
self.state.ext_regs.write('INSN_CNT', 0, True)
return (None, changes)
# If we are not in PRE_EXEC, then we have a valid URND seed. So we
# should step URND nevertheless.
self.state.wsrs.URND.commit()
self.state.wsrs.URND.step()
if self.state.fsm_state in [FsmState.FETCH_WAIT]:
# Zero INSN_CNT the cycle after we are told to start (and every
# cycle after that until we start executing instructions, but that
# doesn't really matter)
changes = self._on_stall(verbose, fetch_next=False)
self.state.ext_regs.write('INSN_CNT', 0, True)
return (None, changes)
if self.state.fsm_state in [FsmState.POST_EXEC, FsmState.LOCKING]:
return (None, self._on_stall(verbose, fetch_next=False))
assert self.state.fsm_state == FsmState.EXEC
insn = self._next_insn
if insn is None:
return (None, self._on_stall(verbose, fetch_next=True))
# Whether or not we're currently executing an instruction, we fetched
# an instruction on the previous cycle. If that fetch failed then
# insn.has_bits will be false. In that case, generate an error by
# throwing away the generator so we start executing the (bogus)
# instruction immediately.
if not insn.has_bits:
self._execute_generator = None
if self._execute_generator is None:
# This is the first cycle for an instruction. Run any setup for
# the state object and then start running the instruction
# itself.
self.state.pre_insn(insn.affects_control)
# Either execute the instruction directly (if it is a
# single-cycle instruction without a `yield` in execute()), or
# return a generator for multi-cycle instructions. Note that
# this doesn't consume the first yielded value.
self._execute_generator = insn.execute(self.state)
if self._execute_generator is not None:
# This is a cycle for a multi-cycle instruction (which possibly
# started just above)
try:
next(self._execute_generator)
except StopIteration:
self._execute_generator = None
sim_stalled = (self._execute_generator is not None)
if not sim_stalled:
return (insn, self._on_retire(verbose, insn))
return (None, self._on_stall(verbose, fetch_next=True))
def dump_data(self) -> bytes:
return self.state.dmem.dump_le_words()
def _print_trace(self, pc: int, disasm: str, changes: List[Trace]) -> None:
'''Print a trace of the current instruction'''
changes_str = ', '.join([t.trace() for t in changes])
print('{:08x} | {:45} | [{}]'.format(pc, disasm, changes_str))