[otbn] Better cache support for information flow analysis. Add a class for caches to simplify the Python script that analyzes OTBN information flow. Signed-off-by: Jade Philipoom <jadep@google.com>
diff --git a/hw/ip/otbn/util/shared/information_flow_analysis.py b/hw/ip/otbn/util/shared/information_flow_analysis.py index 519493a..98a8736 100755 --- a/hw/ip/otbn/util/shared/information_flow_analysis.py +++ b/hw/ip/otbn/util/shared/information_flow_analysis.py
@@ -3,7 +3,7 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, List, Optional, Set, Tuple +from typing import Dict, Generic, List, Optional, Set, Tuple, TypeVar from .control_flow import * from .decode import OTBNProgram @@ -13,6 +13,64 @@ from .operand import RegOperandType from .section import CodeSection +K = TypeVar('K') +V = TypeVar('V') +I = TypeVar('I') + +# TODO: move to other file +class CacheEntry(Generic[K,V]): + '''Represents a single entry in a cache. + + The entry must hold two pieces of information: + - value, the cached result to be returned on a matching lookup + - key, data needed to determine if the entry matches a lookup (e.g. input + parameters to the function whose result has been cached) + + Note that this is not a simple key/value store, because a key might match a + lookup even if it's not an exact match. Determining what exactly needs to + match is implementation-specific and implemented by subclasses. + ''' + def __init__(self, key: K, value: V): + self.key = key + self.value = value + + def is_match(self, key: K) -> bool: + '''Returns whether this entry is a match for the key. + + In the simplest case, this could be just self.key == key; however, the + implementer might choose to ignore certain information when evaluating + the match, depending on the use-case. + ''' + raise NotImplementedError() + +# TODO: move to other file +class Cache(Generic[I,K,V]): + '''Represents a cache to speed up recursive functions. + + The cache is structured with two layers: + - The first layer is a dictionary that maps some hashable index type to the + second layer, a list for each dictionary entry. + - The second layer is a list of CacheEntry instances to be checked. + + The purpose of the two-layer structure is to speed up lookups; the index + type should be used to quickly narrow things down to a limited number of + potentially matching entries (for instance, it could be an input parameter + to the function that absolutely must match for the cache entries to match). + ''' + def __init__(self) -> None: + self.entries : Dict[I,List[CacheEntry[K,V]]] = {} + + def add(self, index: I, entry: CacheEntry[K,V]) -> None: + # Only add if there's no matching entry already + if self.lookup(index, entry.key) is None: + self.entries.setdefault(index, []).append(entry) + + def lookup(self, index: I, key: K) -> Optional[V]: + for entry in self.entries.get(index, []): + if entry.is_match(key): + return entry.value + return None + # Calls to _get_iflow return results in the form of a tuple with entries: # used constants: a set containing the names of input constants the # results depend on; passing a different constants state with @@ -35,13 +93,30 @@ Optional[InformationFlowGraph], Dict[str, int], Dict[str, Set[int]]] -# The cache for _get_iflow maps starting PCs to lists of cache entries for -# those PCs. Each cache entry is a 3-tuple with the following members: -# constant values: a dictionary containing the values of used constants; the -# keys should match the members of the IFlowResult's used -# constants -# result: the result returned from the _get_iflow call -IFlowCache = Dict[int, List[Tuple[Dict[str, int], IFlowResult]]] +class IFlowCacheEntry(CacheEntry[Dict[str,int], IFlowResult]): + '''Represents an entry in the cache for _get_iflow. + + The key for the cache entry is the values of certain constants in the input + `constants` dictionary for the _get_iflow call that produced the cached + result. Only constants that were actually used in the process of computing + the result are stored in the key; if another call to _get_iflow has the + same values for those constants but different values for others, the result + should not change. + ''' + def is_match(self, constants: Dict[str,int]) -> bool: + for k,v in self.key.items(): + if constants[k] != v: + return False + return True + +class IFlowCache(Cache[int,Dict[str,int], IFlowResult]): + '''Represents the cache for _get_iflow. + + The index of the cache is the start PC for the call to _get_iflow. If this + index and the values of the constants used in the call match a new call, + the cached result is returned. + ''' + pass # The information flow of a subroutine is represented as a tuple whose entries # are a subset of the IFlowResult tuple entries; in particular, it has the form @@ -236,41 +311,17 @@ assert False -def _get_iflow_cache_fetch(pc: int, constants: Dict[str, int], - cache: IFlowCache) -> Optional[IFlowResult]: - '''Returns a matching result from the cache if there is one. - - The cache format maps the PC to a list of cache entries, with each entry - holding the cached result as well as the values of constants that it - depends on. The entry is a match if and only if those constant values match - the current constants. - ''' - if pc in cache: - for constant_deps, result in cache[pc]: - all_match = True - for name, value in constant_deps.items(): - if constants.get(name, None) != value: - all_match = False - break - if all_match: - return result - return None - - def _get_iflow_cache_update(pc: int, constants: Dict[str, int], - rec: IFlowResult, cache: IFlowCache) -> None: - '''Updates the cache for _get_iflow after a recursive call.''' - rec_used_constants = rec[0] + result: IFlowResult, cache: IFlowCache) -> None: + '''Updates the cache for _get_iflow.''' + used_constants = result[0] used_constant_values = {} - for name in rec_used_constants: + for name in used_constants: assert name in constants used_constant_values[name] = constants[name] - # Don't bother if an entry already exists - if _get_iflow_cache_fetch(pc, used_constant_values, cache) is not None: - return + cache.add(pc, IFlowCacheEntry(used_constant_values, result)) - cache.setdefault(pc, []).append((used_constant_values, rec)) return @@ -337,7 +388,7 @@ Caches results from recursive calls (updating input cache). Does not modify start_constants. ''' - cached = _get_iflow_cache_fetch(start_pc, start_constants, cache) + cached = cache.lookup(start_pc, start_constants) if cached is not None: return cached @@ -590,7 +641,7 @@ check_acyclic(graph) start_pc = program.get_pc_at_symbol(subroutine_name) _, ret_iflow, end_iflow, _, control_deps = _get_iflow( - program, graph, start_pc, {'x0': 0}, None, {}) + program, graph, start_pc, {'x0': 0}, None, IFlowCache()) if ret_iflow is None and end_iflow is None: raise ValueError('Could not find any complete control-flow paths when ' 'analyzing subroutine.') @@ -609,7 +660,7 @@ ''' check_acyclic(graph) _, ret_iflow, end_iflow, _, control_deps = _get_iflow( - program, graph, program.min_pc(), {'x0': 0}, None, {}) + program, graph, program.min_pc(), {'x0': 0}, None, IFlowCache()) if ret_iflow is not None: # No paths from imem_start should end in RET raise ValueError('Unexpected information flow for paths ending in RET '