[otbn] Better cache support for information flow analysis.
Add a class for caches to simplify the Python script that analyzes OTBN
information flow.
Signed-off-by: Jade Philipoom <jadep@google.com>
diff --git a/hw/ip/otbn/util/shared/information_flow_analysis.py b/hw/ip/otbn/util/shared/information_flow_analysis.py
index 519493a..98a8736 100755
--- a/hw/ip/otbn/util/shared/information_flow_analysis.py
+++ b/hw/ip/otbn/util/shared/information_flow_analysis.py
@@ -3,7 +3,7 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
-from typing import Dict, List, Optional, Set, Tuple
+from typing import Dict, Generic, List, Optional, Set, Tuple, TypeVar
from .control_flow import *
from .decode import OTBNProgram
@@ -13,6 +13,64 @@
from .operand import RegOperandType
from .section import CodeSection
+K = TypeVar('K')
+V = TypeVar('V')
+I = TypeVar('I')
+
+# TODO: move to other file
+class CacheEntry(Generic[K,V]):
+ '''Represents a single entry in a cache.
+
+ The entry must hold two pieces of information:
+ - value, the cached result to be returned on a matching lookup
+ - key, data needed to determine if the entry matches a lookup (e.g. input
+ parameters to the function whose result has been cached)
+
+ Note that this is not a simple key/value store, because a key might match a
+ lookup even if it's not an exact match. Determining what exactly needs to
+ match is implementation-specific and implemented by subclasses.
+ '''
+ def __init__(self, key: K, value: V):
+ self.key = key
+ self.value = value
+
+ def is_match(self, key: K) -> bool:
+ '''Returns whether this entry is a match for the key.
+
+ In the simplest case, this could be just self.key == key; however, the
+ implementer might choose to ignore certain information when evaluating
+ the match, depending on the use-case.
+ '''
+ raise NotImplementedError()
+
+# TODO: move to other file
+class Cache(Generic[I,K,V]):
+ '''Represents a cache to speed up recursive functions.
+
+ The cache is structured with two layers:
+ - The first layer is a dictionary that maps some hashable index type to the
+ second layer, a list for each dictionary entry.
+ - The second layer is a list of CacheEntry instances to be checked.
+
+ The purpose of the two-layer structure is to speed up lookups; the index
+ type should be used to quickly narrow things down to a limited number of
+ potentially matching entries (for instance, it could be an input parameter
+ to the function that absolutely must match for the cache entries to match).
+ '''
+ def __init__(self) -> None:
+ self.entries : Dict[I,List[CacheEntry[K,V]]] = {}
+
+ def add(self, index: I, entry: CacheEntry[K,V]) -> None:
+ # Only add if there's no matching entry already
+ if self.lookup(index, entry.key) is None:
+ self.entries.setdefault(index, []).append(entry)
+
+ def lookup(self, index: I, key: K) -> Optional[V]:
+ for entry in self.entries.get(index, []):
+ if entry.is_match(key):
+ return entry.value
+ return None
+
# Calls to _get_iflow return results in the form of a tuple with entries:
# used constants: a set containing the names of input constants the
# results depend on; passing a different constants state with
@@ -35,13 +93,30 @@
Optional[InformationFlowGraph], Dict[str, int],
Dict[str, Set[int]]]
-# The cache for _get_iflow maps starting PCs to lists of cache entries for
-# those PCs. Each cache entry is a 3-tuple with the following members:
-# constant values: a dictionary containing the values of used constants; the
-# keys should match the members of the IFlowResult's used
-# constants
-# result: the result returned from the _get_iflow call
-IFlowCache = Dict[int, List[Tuple[Dict[str, int], IFlowResult]]]
+class IFlowCacheEntry(CacheEntry[Dict[str,int], IFlowResult]):
+ '''Represents an entry in the cache for _get_iflow.
+
+ The key for the cache entry is the values of certain constants in the input
+ `constants` dictionary for the _get_iflow call that produced the cached
+ result. Only constants that were actually used in the process of computing
+ the result are stored in the key; if another call to _get_iflow has the
+ same values for those constants but different values for others, the result
+ should not change.
+ '''
+ def is_match(self, constants: Dict[str,int]) -> bool:
+ for k,v in self.key.items():
+ if constants[k] != v:
+ return False
+ return True
+
+class IFlowCache(Cache[int,Dict[str,int], IFlowResult]):
+ '''Represents the cache for _get_iflow.
+
+ The index of the cache is the start PC for the call to _get_iflow. If this
+ index and the values of the constants used in the call match a new call,
+ the cached result is returned.
+ '''
+ pass
# The information flow of a subroutine is represented as a tuple whose entries
# are a subset of the IFlowResult tuple entries; in particular, it has the form
@@ -236,41 +311,17 @@
assert False
-def _get_iflow_cache_fetch(pc: int, constants: Dict[str, int],
- cache: IFlowCache) -> Optional[IFlowResult]:
- '''Returns a matching result from the cache if there is one.
-
- The cache format maps the PC to a list of cache entries, with each entry
- holding the cached result as well as the values of constants that it
- depends on. The entry is a match if and only if those constant values match
- the current constants.
- '''
- if pc in cache:
- for constant_deps, result in cache[pc]:
- all_match = True
- for name, value in constant_deps.items():
- if constants.get(name, None) != value:
- all_match = False
- break
- if all_match:
- return result
- return None
-
-
def _get_iflow_cache_update(pc: int, constants: Dict[str, int],
- rec: IFlowResult, cache: IFlowCache) -> None:
- '''Updates the cache for _get_iflow after a recursive call.'''
- rec_used_constants = rec[0]
+ result: IFlowResult, cache: IFlowCache) -> None:
+ '''Updates the cache for _get_iflow.'''
+ used_constants = result[0]
used_constant_values = {}
- for name in rec_used_constants:
+ for name in used_constants:
assert name in constants
used_constant_values[name] = constants[name]
- # Don't bother if an entry already exists
- if _get_iflow_cache_fetch(pc, used_constant_values, cache) is not None:
- return
+ cache.add(pc, IFlowCacheEntry(used_constant_values, result))
- cache.setdefault(pc, []).append((used_constant_values, rec))
return
@@ -337,7 +388,7 @@
Caches results from recursive calls (updating input cache). Does not modify
start_constants.
'''
- cached = _get_iflow_cache_fetch(start_pc, start_constants, cache)
+ cached = cache.lookup(start_pc, start_constants)
if cached is not None:
return cached
@@ -590,7 +641,7 @@
check_acyclic(graph)
start_pc = program.get_pc_at_symbol(subroutine_name)
_, ret_iflow, end_iflow, _, control_deps = _get_iflow(
- program, graph, start_pc, {'x0': 0}, None, {})
+ program, graph, start_pc, {'x0': 0}, None, IFlowCache())
if ret_iflow is None and end_iflow is None:
raise ValueError('Could not find any complete control-flow paths when '
'analyzing subroutine.')
@@ -609,7 +660,7 @@
'''
check_acyclic(graph)
_, ret_iflow, end_iflow, _, control_deps = _get_iflow(
- program, graph, program.min_pc(), {'x0': 0}, None, {})
+ program, graph, program.min_pc(), {'x0': 0}, None, IFlowCache())
if ret_iflow is not None:
# No paths from imem_start should end in RET
raise ValueError('Unexpected information flow for paths ending in RET '