[otbn] Support "nonexistent" information-flow graphs.
Extend the information-flow analysis Python scripts to better represent
information flow where no paths exist by including "nonexistent" graphs
instead of using None. This makes a lot of the logic around seq() and
update() easier.
Signed-off-by: Jade Philipoom <jadep@google.com>
diff --git a/hw/ip/otbn/util/shared/cache.py b/hw/ip/otbn/util/shared/cache.py
new file mode 100644
index 0000000..8976697
--- /dev/null
+++ b/hw/ip/otbn/util/shared/cache.py
@@ -0,0 +1,61 @@
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Generic, Optional, TypeVar
+
+K = TypeVar('K')
+V = TypeVar('V')
+I = TypeVar('I')
+
+class CacheEntry(Generic[K,V]):
+ '''Represents a single entry in a cache.
+
+ The entry must hold two pieces of information:
+ - value, the cached result to be returned on a matching lookup
+ - key, data needed to determine if the entry matches a lookup (e.g. input
+ parameters to the function whose result has been cached)
+
+ Note that this is not a simple key/value store, because a key might match a
+ lookup even if it's not an exact match. Determining what exactly needs to
+ match is implementation-specific and implemented by subclasses.
+ '''
+ def __init__(self, key: K, value: V):
+ self.key = key
+ self.value = value
+
+ def is_match(self, key: K) -> bool:
+ '''Returns whether this entry is a match for the key.
+
+ In the simplest case, this could be just self.key == key; however, the
+ implementer might choose to ignore certain information when evaluating
+ the match, depending on the use-case.
+ '''
+ raise NotImplementedError()
+
+class Cache(Generic[I,K,V]):
+ '''Represents a cache to speed up recursive functions.
+
+ The cache is structured with two layers:
+ - The first layer is a dictionary that maps some hashable index type to the
+ second layer, a list for each dictionary entry.
+ - The second layer is a list of CacheEntry instances to be checked.
+
+ The purpose of the two-layer structure is to speed up lookups; the index
+ type should be used to quickly narrow things down to a limited number of
+ potentially matching entries (for instance, it could be an input parameter
+ to the function that absolutely must match for the cache entries to match).
+ '''
+ def __init__(self) -> None:
+ self.entries : Dict[I,List[CacheEntry[K,V]]] = {}
+
+ def add(self, index: I, entry: CacheEntry[K,V]) -> None:
+ # Only add if there's no matching entry already
+ if self.lookup(index, entry.key) is None:
+ self.entries.setdefault(index, []).append(entry)
+
+ def lookup(self, index: I, key: K) -> Optional[V]:
+ for entry in self.entries.get(index, []):
+ if entry.is_match(key):
+ return entry.value
+ return None
diff --git a/hw/ip/otbn/util/shared/information_flow.py b/hw/ip/otbn/util/shared/information_flow.py
index 69bdeb7..0ba2b79 100644
--- a/hw/ip/otbn/util/shared/information_flow.py
+++ b/hw/ip/otbn/util/shared/information_flow.py
@@ -10,7 +10,7 @@
FLAG_NAMES = ['c', 'm', 'l', 'z']
SPECIAL_REG_NAMES = ['mod', 'acc']
-
+READONLY = ['x0']
class InformationFlowGraph:
'''Represents an information flow graph.
@@ -25,9 +25,45 @@
means the sink is overwritten with a constant value, and information is not
flowing to it from any nodes (including its own previous value).
'''
- def __init__(self, flow: Dict[str, Set[str]]):
+ def __init__(self, flow: Dict[str, Set[str]], exists=True):
self.flow = flow
+ # Should not be modified directly. See the nonexistent() method
+ # documentation for details of what this flag means.
+ self.exists = exists
+
+ @staticmethod
+ def empty() -> 'InformationFlowGraph':
+ '''Represents the graph for a path in which nothing is modified.
+
+ For instance, if a block of code could be executed or not depending on
+ the processor state, then the path where the block is not executed
+ would be an empty() graph. Then, one could use update() to get a graph
+ representing the combination of the two possibilities.
+ '''
+ return InformationFlowGraph({})
+
+ @staticmethod
+ def nonexistent() -> 'InformationFlowGraph':
+ '''Represents the graph for a nonexistent path.
+
+ There is an important distinction between this and an "empty" graph. In
+ particular, for any graph G, G.update(nonexistent) = G, which is not
+ the case for a merely "empty" graph; since the update() method is
+ combining all possible paths, an empty graph means we need to consider
+ that all nodes might be unmodified, while a nonexistent graph has no
+ possible paths and therefore no effect.
+
+ A nonexistent graph can be thought of as "None", but handling it
+ directly within the class reduces the need for None checks.
+
+ For instance, imagine we want to represent the information flow for
+ only paths of a program that end in RET. If no paths from the current
+ point end in RET (because, for instance, all paths end the program with
+ ECALL), then a nonexistent graph would represent the information flow.
+ '''
+ return InformationFlowGraph({}, False)
+
def sources(self, sink: str) -> Set[str]:
'''Returns all sources for the given sink.'''
# if the sink does not appear, it is unmodified, meaning its only
@@ -64,6 +100,18 @@
Does not modify other.
'''
+ if not other.exists:
+ # If the other graph is nonexistent, then this is a no-op.
+ return
+
+ if not self.exists:
+ # Updating a nonexistent graph with another graph should return the
+ # other graph; since we need to modify self, we change this graph's
+ # flow to match other's.
+ self.flow = other.flow.copy()
+ self.exists = other.exists
+ return
+
for sink, sources in other.flow.items():
if sink not in self.flow:
# implicitly, a non-updated value depends only on itself (NOT
@@ -94,6 +142,11 @@
Defensively copies all source sets for the new graph.
'''
+ if not self.exists or not other.exists:
+ # If either this or the other graph is nonexistent, then the
+ # sequence is nonexistent.
+ return InformationFlowGraph.nonexistent()
+
flow = {}
for sink, sources in other.flow.items():
new_sources = set()
@@ -115,6 +168,9 @@
def pretty(self, indent: int = 0) -> str:
'''Return a human-readable representation of the graph.'''
+ if not self.exists:
+ return 'Nonexistent information-flow graph (no possible paths).'
+
prefix = ' ' * indent
flow_strings = {
sink: ','.join(sorted(sources))
@@ -129,24 +185,6 @@
sink))
return '\n'.join(lines)
-
-def safe_update_iflow(
- current: Optional[InformationFlowGraph],
- new: Optional[InformationFlowGraph]) -> Optional[InformationFlowGraph]:
- '''Updates one iflow with the other, handling None as appropriate.
-
- If `new` is None, simply returns `current`. If `new` is not None but
- `current` is None, returns `new`. If neither is None, updates `current` to
- include values from `new, then returns `current`.
- '''
- if new is None:
- return current
- if current is None:
- return new
- current.update(new)
- return current
-
-
class InsnInformationFlowNode:
'''Represents an information flow node whose value may depend on operands.
'''
@@ -457,15 +495,19 @@
def evaluate(
self, op_vals: Dict[str, int],
- constant_regs: Dict[str, int]) -> Optional[InformationFlowGraph]:
+ constant_regs: Dict[str, int]) -> InformationFlowGraph:
if not self.test.check(op_vals):
# Rule is not triggered
- return None
+ return InformationFlowGraph.nonexistent()
sources = set()
for node in self.flows_from:
sources.add(node.evaluate(op_vals, constant_regs))
flow = {}
for node in self.flows_to:
+ if node in READONLY:
+ # No information will actually flow to this node, because it is
+ # not writeable; skip.
+ continue
dest = node.evaluate(op_vals, constant_regs)
flow[dest] = sources.copy()
return InformationFlowGraph(flow)
@@ -518,18 +560,10 @@
def evaluate(self, op_vals: Dict[str, int],
constant_regs: Dict[str, int]) -> InformationFlowGraph:
- graph = None
+ graph = InformationFlowGraph.nonexistent()
for rule in self.rules:
rule_graph = rule.evaluate(op_vals, constant_regs)
- graph = safe_update_iflow(graph, rule_graph)
-
- if graph is None:
- # If no rules are triggered, return an empty graph
- graph = InformationFlowGraph({})
-
- # The x0 register is special and always zero; make sure the
- # information-flow graph shows it having no dependencies.
- graph.flow['x0'] = set()
+ graph.update(rule_graph)
return graph
diff --git a/hw/ip/otbn/util/shared/information_flow_analysis.py b/hw/ip/otbn/util/shared/information_flow_analysis.py
index 98a8736..8d17720 100755
--- a/hw/ip/otbn/util/shared/information_flow_analysis.py
+++ b/hw/ip/otbn/util/shared/information_flow_analysis.py
@@ -3,81 +3,23 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
-from typing import Dict, Generic, List, Optional, Set, Tuple, TypeVar
+from typing import Dict, List, Optional, Set, Tuple
from .control_flow import *
+from .cache import CacheEntry, Cache
from .decode import OTBNProgram
-from .information_flow import (InformationFlowGraph, InsnInformationFlow,
- safe_update_iflow)
+from .information_flow import InformationFlowGraph, InsnInformationFlow
from .insn_yaml import Insn
from .operand import RegOperandType
from .section import CodeSection
-K = TypeVar('K')
-V = TypeVar('V')
-I = TypeVar('I')
-
-# TODO: move to other file
-class CacheEntry(Generic[K,V]):
- '''Represents a single entry in a cache.
-
- The entry must hold two pieces of information:
- - value, the cached result to be returned on a matching lookup
- - key, data needed to determine if the entry matches a lookup (e.g. input
- parameters to the function whose result has been cached)
-
- Note that this is not a simple key/value store, because a key might match a
- lookup even if it's not an exact match. Determining what exactly needs to
- match is implementation-specific and implemented by subclasses.
- '''
- def __init__(self, key: K, value: V):
- self.key = key
- self.value = value
-
- def is_match(self, key: K) -> bool:
- '''Returns whether this entry is a match for the key.
-
- In the simplest case, this could be just self.key == key; however, the
- implementer might choose to ignore certain information when evaluating
- the match, depending on the use-case.
- '''
- raise NotImplementedError()
-
-# TODO: move to other file
-class Cache(Generic[I,K,V]):
- '''Represents a cache to speed up recursive functions.
-
- The cache is structured with two layers:
- - The first layer is a dictionary that maps some hashable index type to the
- second layer, a list for each dictionary entry.
- - The second layer is a list of CacheEntry instances to be checked.
-
- The purpose of the two-layer structure is to speed up lookups; the index
- type should be used to quickly narrow things down to a limited number of
- potentially matching entries (for instance, it could be an input parameter
- to the function that absolutely must match for the cache entries to match).
- '''
- def __init__(self) -> None:
- self.entries : Dict[I,List[CacheEntry[K,V]]] = {}
-
- def add(self, index: I, entry: CacheEntry[K,V]) -> None:
- # Only add if there's no matching entry already
- if self.lookup(index, entry.key) is None:
- self.entries.setdefault(index, []).append(entry)
-
- def lookup(self, index: I, key: K) -> Optional[V]:
- for entry in self.entries.get(index, []):
- if entry.is_match(key):
- return entry.value
- return None
-
# Calls to _get_iflow return results in the form of a tuple with entries:
# used constants: a set containing the names of input constants the
# results depend on; passing a different constants state with
# these entries unchanged should not change the results
# return iflow: an information-flow graph for any paths ending in a return
# to the address that is on top of the call stack at the
-# original PC (None if there are no such paths)
+# original PC
# end iflow: an information-flow graph for any paths that lead to the
# end of the entire program (i.e. ECALL instruction or end of
# IMEM)
@@ -89,8 +31,7 @@
# the program; the value is a set of PCs of control-flow
# instructions through which the node influences the control
# flow
-IFlowResult = Tuple[Set[str], Optional[InformationFlowGraph],
- Optional[InformationFlowGraph], Dict[str, int],
+IFlowResult = Tuple[Set[str], InformationFlowGraph, InformationFlowGraph, Dict[str, int],
Dict[str, Set[int]]]
class IFlowCacheEntry(CacheEntry[Dict[str,int], IFlowResult]):
@@ -105,7 +46,7 @@
'''
def is_match(self, constants: Dict[str,int]) -> bool:
for k,v in self.key.items():
- if constants[k] != v:
+ if constants.get(k, None) != v:
return False
return True
@@ -121,13 +62,12 @@
# The information flow of a subroutine is represented as a tuple whose entries
# are a subset of the IFlowResult tuple entries; in particular, it has the form
# (return iflow, end iflow, control deps).
-SubroutineIFlow = Tuple[Optional[InformationFlowGraph],
- Optional[InformationFlowGraph], Dict[str, Set[int]]]
+SubroutineIFlow = Tuple[InformationFlowGraph,InformationFlowGraph, Dict[str, Set[int]]]
# The information flow of a full program is the same as for a subroutine,
# except with no "return" information flow. Since the call stack is empty
# at the start, we're not expecting any return paths!
-ProgramIFlow = Tuple[Optional[InformationFlowGraph], Dict[str, Set[int]]]
+ProgramIFlow = Tuple[InformationFlowGraph, Dict[str, Set[int]]]
def _get_op_val_str(insn: Insn, op_vals: Dict[str, int], opname: str) -> str:
@@ -148,10 +88,7 @@
Modifies the input `constants` dictionary.
'''
- # Make sure x0 is always constant (0), since it is not writeable
- constants['x0'] = 0
-
- new_constants = {}
+ new_constants = {'x0': 0}
if insn.mnemonic == 'addi':
grs1_name = _get_op_val_str(insn, op_vals, 'grs1')
if grs1_name in constants:
@@ -184,9 +121,6 @@
constants.update(new_constants)
- # Make sure x0 is always constant (0), since it is not writeable
- constants['x0'] = 0
-
return
@@ -265,7 +199,7 @@
encountering a control-flow instruction. Updates `constants` to hold the
constant values after the section has finished.
'''
- iflow = InformationFlowGraph({'x0': set()})
+ iflow = InformationFlowGraph.empty()
constant_deps = set()
for pc in range(start_pc, end_pc + 4, 4):
@@ -349,9 +283,9 @@
def _get_iflow_update_state(
rec_result: IFlowResult, iflow: InformationFlowGraph,
- program_end_iflow: Optional[InformationFlowGraph],
+ program_end_iflow: InformationFlowGraph,
used_constants: Set[str],
- control_deps: Dict[str, Set[int]]) -> Optional[InformationFlowGraph]:
+ control_deps: Dict[str, Set[int]]) -> InformationFlowGraph:
'''Update the internal state of _get_iflow after a recursive call.
The `used_constants` and `control_deps` state elements are updated in
@@ -366,9 +300,7 @@
_update_control_deps(control_deps, iflow, rec_control_deps)
# Update information flow results for paths where the program ends
- if rec_program_end_iflow is not None:
- end_iflow = iflow.seq(rec_program_end_iflow)
- program_end_iflow = safe_update_iflow(program_end_iflow, end_iflow)
+ program_end_iflow.update(iflow.seq(rec_program_end_iflow))
return program_end_iflow
@@ -396,12 +328,13 @@
# The combined information flow for all paths leading to the end of the
# subroutine (i.e. a RET, not counting RETS that happen after jumps within
- # the subroutine at start_pc)
- return_iflow = None
+ # the subroutine at start_pc). Initialize as nonexistent() because no paths
+ # have yet been found.
+ return_iflow = InformationFlowGraph.nonexistent()
# The combined information flow for all paths leading to the end of the
# program (i.e. an ECALL or the end of IMEM)
- program_end_iflow = None
+ program_end_iflow = InformationFlowGraph.nonexistent()
# The control-flow nodes whose values at the start PC influence control
# flow
@@ -486,8 +419,7 @@
# Compose current iflow with the flow for paths that hit the end of
# the loop
- if body_return_iflow is not None:
- iflow = iflow.seq(body_return_iflow)
+ iflow = iflow.seq(body_return_iflow)
# Set the next edges to the instruction after the loop ends
edges = [ControlLoc(body_loc.loop_end_pc + 4)]
@@ -537,10 +469,7 @@
# Clear common constants; there are no return branches here
common_constants = {'x0': 0}
- if program_end_iflow is None:
- program_end_iflow = iflow
- else:
- program_end_iflow.update(iflow)
+ program_end_iflow.update(iflow)
elif isinstance(loc, Ret):
if loop_end_pc is not None:
raise RuntimeError(
@@ -554,10 +483,7 @@
# Since this is the only edge, common_constants must be unset
common_constants = constants
- if return_iflow is None:
- return_iflow = iflow
- else:
- return_iflow.update(iflow)
+ return_iflow.update(iflow)
elif isinstance(loc, LoopStart):
# We shouldn't hit a LoopStart here; those cases (a loop
@@ -587,9 +513,7 @@
# Update return_iflow with the current iflow composed with return
# paths
- if rec_return_iflow is not None:
- return_iflow = safe_update_iflow(return_iflow,
- iflow.seq(rec_return_iflow))
+ return_iflow.update(iflow.seq(rec_return_iflow))
else:
raise RuntimeError(
'Unexpected next control location type at PC {:#x}: {}'.format(
@@ -642,7 +566,7 @@
start_pc = program.get_pc_at_symbol(subroutine_name)
_, ret_iflow, end_iflow, _, control_deps = _get_iflow(
program, graph, start_pc, {'x0': 0}, None, IFlowCache())
- if ret_iflow is None and end_iflow is None:
+ if not (ret_iflow.exists or end_iflow.exists):
raise ValueError('Could not find any complete control-flow paths when '
'analyzing subroutine.')
return ret_iflow, end_iflow, control_deps
@@ -661,11 +585,11 @@
check_acyclic(graph)
_, ret_iflow, end_iflow, _, control_deps = _get_iflow(
program, graph, program.min_pc(), {'x0': 0}, None, IFlowCache())
- if ret_iflow is not None:
+ if ret_iflow.exists:
# No paths from imem_start should end in RET
raise ValueError('Unexpected information flow for paths ending in RET '
'when analyzing whole program.')
- assert end_iflow is not None
+ assert end_iflow.exists
return end_iflow, control_deps