| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| from typing import Dict, Iterator, List, Optional, Tuple |
| |
| from .constants import ErrBits, Status |
| from .decode import EmptyInsn |
| from .isa import OTBNInsn |
| from .state import OTBNState, FsmState |
| from .stats import ExecutionStats |
| from .trace import Trace |
| |
| # A dictionary that defines a function of the form "address -> from -> to". If |
| # PC is the current PC and cnt is the count for the innermost loop then |
| # warps[PC][cnt] = new_cnt means that we should warp the current count to |
| # new_cnt. |
| LoopWarps = Dict[int, Dict[int, int]] |
| |
| # The return type of the Step function: a possible instruction that was |
| # executed, together with a list of changes. |
| StepRes = Tuple[Optional[OTBNInsn], List[Trace]] |
| |
| |
| class OTBNSim: |
| def __init__(self) -> None: |
| self.state = OTBNState() |
| self.program = [] # type: List[OTBNInsn] |
| self.loop_warps = {} # type: LoopWarps |
| self.stats = None # type: Optional[ExecutionStats] |
| self._execute_generator = None # type: Optional[Iterator[None]] |
| self._next_insn = None # type: Optional[OTBNInsn] |
| |
| def load_program(self, program: List[OTBNInsn]) -> None: |
| self.program = program.copy() |
| self.state.clear_imem_invalidation() |
| |
| def add_loop_warp(self, addr: int, from_cnt: int, to_cnt: int) -> None: |
| '''Add a new loop warp to the simulation''' |
| self.loop_warps.setdefault(addr, {})[from_cnt] = to_cnt |
| |
| def load_data(self, data: bytes, has_validity: bool) -> None: |
| '''Load bytes into DMEM, starting at address zero. |
| |
| If has_validity is true, each 32-bit word should be represented by 5 |
| bytes (1 byte that says whether the word is valid, then 4 bytes that |
| give the word in little-endian format). If has_validity is false, each |
| word is considered valid and is represented by 4 bytes in little-endian |
| format. |
| |
| ''' |
| self.state.dmem.load_le_words(data, has_validity) |
| |
| def start(self, collect_stats: bool) -> None: |
| '''Prepare to start the execution. |
| |
| Use run() or step() to actually execute the program. |
| |
| ''' |
| if self.state.get_fsm_state() != FsmState.IDLE: |
| return |
| self.stats = ExecutionStats(self.program) if collect_stats else None |
| self._execute_generator = None |
| self._next_insn = None |
| self.state.start() |
| |
| def initial_secure_wipe(self) -> None: |
| '''This is run at the start of a secure wipe after reset.''' |
| self.state.start_init_sec_wipe() |
| |
| def start_mem_wipe(self, is_imem: bool) -> None: |
| if self.state.get_fsm_state() != FsmState.IDLE: |
| return |
| |
| new_status = (Status.BUSY_SEC_WIPE_IMEM |
| if is_imem else Status.BUSY_SEC_WIPE_DMEM) |
| |
| self.state.set_fsm_state(FsmState.MEM_SEC_WIPE) |
| self.state.ext_regs.write('STATUS', new_status, True) |
| |
| def _fetch(self, pc: int) -> OTBNInsn: |
| word_pc = pc >> 2 |
| if word_pc >= len(self.program): |
| raise RuntimeError('Trying to execute instruction at address ' |
| '{:#x}, but the program is only {:#x} ' |
| 'bytes ({} instructions) long. Since there ' |
| 'are no architectural contents of the ' |
| 'memory here, we have to stop.' |
| .format(pc, |
| 4 * len(self.program), |
| len(self.program))) |
| |
| if self.state.invalidated_imem: |
| return EmptyInsn(self.state.pc) |
| |
| return self.program[word_pc] |
| |
| def _on_stall(self, |
| verbose: bool, |
| fetch_next: bool) -> List[Trace]: |
| '''This is run on a stall cycle''' |
| self.state.stop_if_pending_halt() |
| changes = self.state.changes() |
| self.state.commit(sim_stalled=True) |
| if fetch_next: |
| self._next_insn = self._fetch(self.state.pc) |
| if self.stats is not None and not self.state.wiping(): |
| self.stats.record_stall() |
| if verbose: |
| self._print_trace(self.state.pc, '(stall)', changes) |
| |
| return changes |
| |
| def _on_retire(self, |
| verbose: bool, |
| insn: OTBNInsn) -> List[Trace]: |
| '''This is run when an instruction completes''' |
| assert self._execute_generator is None |
| self.state.post_insn(self.loop_warps.get(self.state.pc, {})) |
| |
| if self.stats is not None: |
| self.stats.record_insn(insn, self.state) |
| |
| halting = self.state.stop_if_pending_halt() |
| changes = self.state.changes() |
| |
| # Program counter before commit |
| pc_before = self.state.pc |
| self.state.commit(sim_stalled=False) |
| |
| # Fetch the next instruction unless we're done or this instruction had |
| # `has_fetch_stall` set (in which case we inject a single cycle stall). |
| no_fetch = halting or insn.has_fetch_stall |
| self._next_insn = None if no_fetch else self._fetch(self.state.pc) |
| |
| disasm = insn.disassemble(pc_before) |
| if verbose: |
| self._print_trace(pc_before, disasm, changes) |
| |
| return changes |
| |
| def step(self, verbose: bool) -> StepRes: |
| '''Run a single cycle. |
| |
| Returns the instruction, together with a list of the architectural |
| changes that have happened. If the model isn't currently executing, |
| returns no instruction and no changes. |
| |
| ''' |
| fsm_state = self.state.get_fsm_state() |
| # Pairs: (stepper, handles_injected_err). If handles_injected_err is |
| # False then the generic code here will deal with any pending errors in |
| # self.state.injected_err_bits. If True, then we expect the stepper |
| # function to handle them. |
| steppers = { |
| FsmState.MEM_SEC_WIPE: (self._step_ext_wipe, False), |
| FsmState.IDLE: (self._step_idle, False), |
| FsmState.PRE_EXEC: (self._step_pre_exec, False), |
| FsmState.EXEC: (self._step_exec, True), |
| FsmState.WIPING_GOOD: (self._step_wiping, False), |
| FsmState.WIPING_BAD: (self._step_wiping, False), |
| FsmState.LOCKED: (self._step_idle, False) |
| } |
| |
| stepper, handles_injected_err = steppers[fsm_state] |
| self.state.step(not handles_injected_err) |
| |
| return stepper(verbose) |
| |
| def _step_idle(self, verbose: bool) -> StepRes: |
| '''Step the simulation when OTBN is IDLE or LOCKED''' |
| self.state.stop_if_pending_halt() |
| if ((self.state._fsm_state == FsmState.LOCKED and |
| self.state.cycles_in_this_state == 0)): |
| self.state.ext_regs.write('INSN_CNT', 0, True) |
| |
| if self.state.init_sec_wipe_is_running(): |
| # Wait for the URND seed. Once that appears, switch to WIPING_GOOD |
| # unless the FSM state is already LOCKED, in which case we change it |
| # to WIPING_BAD. |
| if self.state.wsrs.URND.running: |
| if self.state.get_fsm_state() == FsmState.LOCKED: |
| self.state.set_fsm_state(FsmState.WIPING_BAD) |
| else: |
| self.state.set_fsm_state(FsmState.WIPING_GOOD) |
| |
| changes = self.state.changes() |
| self.state.commit(sim_stalled=True) |
| return (None, changes) |
| |
| def _step_ext_wipe(self, verbose: bool) -> StepRes: |
| '''Step the simulation DMEM/IMEM wipe operation''' |
| self.state.stop_if_pending_halt() |
| changes = self.state.changes() |
| self.state.commit(sim_stalled=True) |
| return (None, changes) |
| |
| def _step_pre_exec(self, verbose: bool) -> StepRes: |
| '''Step the simulation in the PRE_EXEC state |
| |
| In this state, we're waiting for a URND seed. Once that appears, we |
| switch to EXEC. |
| ''' |
| if self.state.wsrs.URND.running: |
| self.state.set_fsm_state(FsmState.EXEC) |
| |
| changes = self._on_stall(verbose, fetch_next=False) |
| |
| # Zero INSN_CNT the cycle after we are told to start |
| if self.state.ext_regs.read('INSN_CNT', True) != 0: |
| self.state.ext_regs.write('INSN_CNT', 0, True) |
| |
| return (None, changes) |
| |
| def _step_exec(self, verbose: bool) -> StepRes: |
| '''Step the simulation when executing code''' |
| |
| # The initial secure wipe *must* be done when executing code. |
| assert(self.state.init_sec_wipe_is_done()) |
| |
| self.state.wsrs.URND.step() |
| |
| insn = self._next_insn |
| if insn is None: |
| self.state.take_injected_err_bits() |
| return (None, self._on_stall(verbose, fetch_next=True)) |
| |
| # Whether or not we're currently executing an instruction, we fetched |
| # an instruction on the previous cycle. If that fetch failed then |
| # insn.has_bits will be false. In that case, generate an error by |
| # throwing away the generator so we start executing the (bogus) |
| # instruction immediately. |
| if not insn.has_bits: |
| self._execute_generator = None |
| |
| if self._execute_generator is None: |
| # This is the first cycle for an instruction. Run any setup for |
| # the state object and then start running the instruction |
| # itself. |
| self.state.pre_insn(insn.affects_control) |
| |
| # Either execute the instruction directly (if it is a |
| # single-cycle instruction without a `yield` in execute()), or |
| # return a generator for multi-cycle instructions. Note that |
| # this doesn't consume the first yielded value. |
| self._execute_generator = insn.execute(self.state) |
| |
| if self._execute_generator is not None: |
| # This is a cycle for a multi-cycle instruction (which possibly |
| # started just above) |
| try: |
| next(self._execute_generator) |
| except StopIteration: |
| self._execute_generator = None |
| |
| if (self.state.wsrs.RND.rep_err_escalate): |
| self.state.stop_at_end_of_cycle(ErrBits.RND_REP_CHK_FAIL) |
| if (self.state.wsrs.RND.fips_err_escalate): |
| self.state.stop_at_end_of_cycle(ErrBits.RND_FIPS_CHK_FAIL) |
| |
| # Handle any pending injected error. Note that this has to run after |
| # we've executed any instruction, to ensure we get a trace entry for |
| # that instruction before it gets shot down. |
| self.state.take_injected_err_bits() |
| |
| # If something bad happened asynchronously (because of an escalation), |
| # we might have an unfinished instruction. But we want to turn it into |
| # a "finished, but aborted" one. |
| if self.state.pending_halt: |
| self._execute_generator = None |
| |
| sim_stalled = (self._execute_generator is not None) |
| if not sim_stalled: |
| return (insn, self._on_retire(verbose, insn)) |
| |
| return (None, self._on_stall(verbose, fetch_next=False)) |
| |
| def _step_wiping(self, verbose: bool) -> StepRes: |
| '''Step the simulation when wiping''' |
| assert self.state.wipe_cycles >= 0 |
| if self.state.wipe_cycles > 0: |
| self.state.wipe_cycles -= 1 |
| |
| # If something bad happened asynchronously (because of an escalation), |
| # we want to finish the secure wipe but accept no further commands. To |
| # this end, turn this into a "wipe because something bad happended". |
| if self.state.pending_halt: |
| self.state._fsm_state = FsmState.WIPING_BAD |
| |
| # Reflect wiping in STATUS register if it has not been updated yet. |
| if (self.state.wipe_cycles > 0 and |
| (self.state.ext_regs.read('STATUS', True) not in |
| [Status.BUSY_SEC_WIPE_INT, Status.LOCKED])): |
| self.state.ext_regs.write('STATUS', Status.BUSY_SEC_WIPE_INT, True) |
| |
| is_good = self.state.get_fsm_state() == FsmState.WIPING_GOOD |
| |
| # Clear the WIPE_START register if it was set. |
| if self.state.ext_regs.read('WIPE_START', True): |
| self.state.ext_regs.write('WIPE_START', 0, True) |
| |
| # Zero INSN_CNT once if we're in state WIPING_BAD. |
| if not is_good and self.state.ext_regs.read('INSN_CNT', True) != 0: |
| if self.state.zero_insn_cnt_next or not self.state.lock_immediately: |
| self.state.ext_regs.write('INSN_CNT', 0, True) |
| self.state.zero_insn_cnt_next = False |
| if self.state.lock_immediately: |
| # Zero INSN_CNT in the *next* cycle to match RTL control flow. |
| self.state.zero_insn_cnt_next = True |
| |
| if self.state.wipe_cycles == 1: |
| if self.state.first_round_of_wipe: |
| # Request URND refresh before second round. |
| self.state.wsrs.URND.running = False |
| self.state._urnd_client.request() |
| else: |
| # Wipe all registers and set STATUS on the penultimate cycle. |
| next_status = Status.IDLE if is_good else Status.LOCKED |
| self.state.ext_regs.write('STATUS', next_status, True) |
| self.state.wipe() |
| |
| # On the final cycle, set the next state to IDLE or LOCKED. If an |
| # initial secure wipe was in progress, it is now done (if the wipe was |
| # good). |
| if self.state.wipe_cycles == 0: |
| if self.state.first_round_of_wipe: |
| # Once the URND refresh is acknowledged, do second round of wipe. |
| if self.state.wsrs.URND.running: |
| self.state.first_round_of_wipe = False |
| self.state.set_fsm_state(self.state.get_fsm_state()) |
| else: |
| if is_good: |
| next_state = FsmState.IDLE |
| if self.state.init_sec_wipe_is_running(): |
| self.state.complete_init_sec_wipe() |
| else: |
| next_state = FsmState.LOCKED |
| |
| # Also, set wipe_cycles to an invalid value to make really sure |
| # we've left the wiping code. |
| self.wipe_cycles = -1 |
| self.state.first_round_of_wipe = True |
| self.state.set_fsm_state(next_state) |
| |
| return (None, self._on_stall(verbose, fetch_next=False)) |
| |
| def dump_data(self) -> bytes: |
| return self.state.dmem.dump_le_words() |
| |
| def _print_trace(self, pc: int, disasm: str, changes: List[Trace]) -> None: |
| '''Print a trace of the current instruction''' |
| changes_str = ', '.join([t.trace() for t in changes]) |
| print('{:08x} | {:45} | [{}]'.format(pc, disasm, changes_str)) |
| |
| def on_otp_cdc_done(self) -> None: |
| '''Signifies when the scrambling key request gets processed''' |
| # This happens when we're doing a memory secure wipe, in which case we |
| # want to switch to IDLE. However, it also happens if we're at the end |
| # of a run that either will lock or already has done. In that case, we |
| # don't want to do an FSM state change. |
| cur_state = self.state.get_fsm_state() |
| assert cur_state in [FsmState.MEM_SEC_WIPE, |
| FsmState.WIPING_BAD, FsmState.LOCKED] |
| if cur_state == FsmState.MEM_SEC_WIPE: |
| self.state.ext_regs.write('STATUS', Status.IDLE, True) |
| self.state.set_fsm_state(FsmState.IDLE) |
| |
| def send_err_escalation(self, err_val: int, lock_immediately: bool) -> None: |
| '''React to an error escalation''' |
| assert err_val & ~ErrBits.MASK == 0 |
| self.state.injected_err_bits |= err_val |
| self.state.lock_immediately = lock_immediately |
| |
| def send_rma_req(self) -> None: |
| # Incoming RMA request basically acts like a fatal error. |
| # It does not immediately change the status to LOCKED just like any |
| # fatal error except spurious URND ack. |
| self.state.rma_req = True |
| self.state.pending_halt = True |