[otbn] Support analysis of non-loop control-flow cycles.
Extend OTBN information-flow analysis scripts to support more types of
cyclic control flow than loops, for instance jumps/branches that link to
each other repeatedly until some condition is met.
Signed-off-by: Jade Philipoom <jadep@google.com>
diff --git a/hw/ip/otbn/util/analyze_information_flow.py b/hw/ip/otbn/util/analyze_information_flow.py
index f2e82d6..1e7cd35 100755
--- a/hw/ip/otbn/util/analyze_information_flow.py
+++ b/hw/ip/otbn/util/analyze_information_flow.py
@@ -50,6 +50,13 @@
if args.verbose:
print('Control-flow graph:')
print(graph.pretty(program, indent=2))
+ cycle_pcs = graph.get_cycle_starts()
+ if cycle_pcs:
+ print('Control flow has cycles starting at the following PCs:')
+ for pc in cycle_pcs:
+ symbols = program.get_symbols_for_pc(pc)
+ label_str = ' <{}>'.format(', '.join(symbols)) if symbols else ''
+ print('{:#x}{}'.format(pc, label_str))
# Compute information-flow graph(s).
if args.subroutine is None:
diff --git a/hw/ip/otbn/util/shared/constants.py b/hw/ip/otbn/util/shared/constants.py
index fabbae8..5e4a380 100644
--- a/hw/ip/otbn/util/shared/constants.py
+++ b/hw/ip/otbn/util/shared/constants.py
@@ -3,7 +3,8 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
-from typing import Dict, Optional
+from copy import deepcopy
+from typing import Dict, Iterable, Optional
from .insn_yaml import Insn
from .operand import RegOperandType
@@ -46,8 +47,15 @@
def __contains__(self, gpr: str) -> bool:
return gpr in self.values
- def copy(self) -> 'ConstantContext':
- return ConstantContext(self.values)
+ def __deepcopy__(self, memo) -> 'ConstantContext':
+ return ConstantContext(deepcopy(self.values, memo))
+
+ def includes(self, other: 'ConstantContext') -> 'ConstantContext':
+ '''Returns true iff self contains all key/value pairs in other.'''
+ for k, v in other.values.items():
+ if self.get(k) != v:
+ return False
+ return True
def intersect(self, other: 'ConstantContext') -> 'ConstantContext':
'''Returns a new context with only values on which self/other agree.
@@ -60,6 +68,18 @@
out[k] = v
return ConstantContext(out)
+ def removemany(self, to_remove: Iterable[str]) -> None:
+ '''Remove the given registers from the constant context.
+
+ Useful for cases when, for example, the given registers have been
+ overwritten. Does nothing for registers that are not known constants,
+ or for the special register x0.
+ '''
+ for reg in to_remove:
+ if reg != 'x0':
+ self.values.pop(reg, None)
+
+
def update_insn(self, insn: Insn, op_vals: Dict[str, int]) -> None:
'''Updates to new known constant values GPRs after the instruction.
@@ -95,8 +115,6 @@
# register can no longer be determined; remove it from the constants
# dictionary.
iflow = insn.iflow.evaluate(op_vals, self.values)
- for sink in iflow.all_sinks():
- # Remove from self.values if key exists
- self.values.pop(sink, None)
+ self.removemany(iflow.all_sinks())
self.values.update(new_values)
diff --git a/hw/ip/otbn/util/shared/control_flow.py b/hw/ip/otbn/util/shared/control_flow.py
index 7d14102..3dc27c6 100644
--- a/hw/ip/otbn/util/shared/control_flow.py
+++ b/hw/ip/otbn/util/shared/control_flow.py
@@ -72,6 +72,7 @@
is the last PC of the loop body.
'''
def __init__(self, loop_start_pc: int, loop_end_pc: int):
+ super().__init__(loop_start_pc)
self.loop_start_pc = loop_start_pc
self.loop_end_pc = loop_end_pc
@@ -83,19 +84,30 @@
return '<loop from {:#x}-{:#x}>'.format(self.loop_start_pc,
self.loop_end_pc)
+class Cycle(ControlLoc):
+ '''Represents a control flow that loops back to a previous PC.
-class LoopEnd(ControlLoc):
+ This is not represented as a normal ControlLoc because it's convenient to
+ catch these cyclic control flows rather than follow the jump/branch and
+ potentially cause an infinite loop; with this structure, the control-flow
+ graph remains a DAG.
+ '''
+ @classmethod
+ def is_special(cls) -> bool:
+ return True
+
+ def pretty(self) -> str:
+ return '<cycle: back to {:#x}>'.format(self.pc)
+
+class LoopEnd(Cycle):
'''Represents the end of a loop (looping back to the start).
Contains the LoopStart instance we're looping back to.
'''
def __init__(self, loop_start: LoopStart):
+ super().__init__(loop_start.loop_start_pc)
self.loop_start = loop_start
- @classmethod
- def is_special(cls) -> bool:
- return True
-
def pretty(self) -> str:
return '<loop end: back to {:#x}>'.format(
self.loop_start.loop_start_pc)
@@ -108,19 +120,18 @@
The `graph` dictionary maps PCs to 2-item tuples. Not all PCs are in
`graph`; only ones that are jumped, branched, or looped to in the control
- flow starting at `start`.
-
- The first item in the tuple for each PC is the CodeSection that starts at
- the PCs. It is guaranteed to be 0 or more straightline instructions,
- followed by a final non-straightline instruction.
-
- The second item in the tuple for each PC is a list of the various
- control-flow locations that can be reached *after* the CodeSection has run.
- For example, a CodeSection ending in a BNE instruction would have two
- ControlLocs in its list.
+ flow starting at `start`. The 2-item tuple for each PC is as follows:
+ - 0: the CodeSection that starts at the PC. It is guaranteed to be 0
+ or more straightline instructions, followed by a final
+ non-straightline instruction.
+ - 1: a list of the various control-flow locations that can be reached
+ *after* the CodeSection has run. For example, a CodeSection ending
+ in a BNE instruction would have two ControlLocs in its list, and a
+ CodeSection ending in `ret` would have a single Ret() instance in its
+ list.
'''
- def __init__(self, start: int, graph: Dict[int, Tuple[CodeSection,
- List[ControlLoc]]]):
+ def __init__(self, start: int,
+ graph: Dict[int, Tuple[CodeSection,List[ControlLoc]]]):
for pc, (sec, _) in graph.items():
assert sec.start == pc
self.start = start
@@ -153,27 +164,14 @@
'''
return self.get_entry(pc)[1]
- def get_cycles(self,
- pc: int,
- path: List[int] = []) -> Dict[int, List[int]]:
- '''Returns any (non LOOP/LOOPI) control-flow cycles in this graph.'''
- cycles: Dict[int, List[int]] = {}
- sec, edges = self.get_entry(pc)
- for loc in edges:
- if isinstance(loc, LoopStart):
- pc = loc.loop_start_pc
- elif loc.is_special():
- # Skip Ret, Ecall, ImemEnd, LoopEnd
- continue
- else:
- pc = loc.pc
- if pc in path:
- # Found a cycle; report it
- cycles.setdefault(pc, []).append(sec.end)
- else:
- pcs = list(range(sec.start, sec.end + 4, 4))
- cycles.update(self.get_cycles(pc, path + pcs))
- return cycles
+ def get_cycle_starts(self) -> Set[int]:
+ '''Returns start PCs of all marked cycles in the graph.'''
+ out = set()
+ for pc, entry in self.graph.items():
+ for edge in entry[1]:
+ if isinstance(edge, Cycle):
+ out.add(edge.pc)
+ return out
def _pretty_lines(self,
program: OTBNProgram,
@@ -188,13 +186,16 @@
'''
out = []
sec, edges = self.get_entry(entry_pc)
- symbols = program.get_symbols_for_pc(entry_pc)
+ symbols = [s for s in program.get_symbols_for_pc(entry_pc) if s != '']
pcs_to_print = list(range(sec.start, sec.end + 4, 4))
- if len(symbols) > 0:
+ if entry_pc in already_printed and not concise:
+ if len(symbols) > 0:
+ out.append((0, '<{}> (see above)'.format(', '.join(symbols))))
+ else:
+ out.append((0, '{:#x}..{:#x} (see above)'.format(entry_pc, sec.end)))
+ pcs_to_print = []
+ elif len(symbols) > 0:
out.append((0, '<{}>'.format(', '.join(symbols))))
- if entry_pc in already_printed and not concise:
- out.append((0, '(see above)'))
- pcs_to_print = []
if concise:
# only print last insn
out.append((0, '...'))
@@ -212,12 +213,15 @@
if not (loc.is_special() or isinstance(loc, LoopEnd))
]
child_indent = 0 if len(non_special_edges) <= 1 else 2
+ if any(map(lambda loc: isinstance(loc, LoopEnd), edges)):
+ child_indent = -2
for loc in edges:
if isinstance(loc, LoopEnd):
out.append((0, loc.pretty()))
continue
if len(non_special_edges) > 1:
- out.append((0, '->'))
+ last_insn = program.get_insn(sec.end)
+ out.append((0, '-> (branch from {:#x}: {})'.format(sec.end, last_insn.mnemonic)))
if loc.is_special():
out.append((child_indent, loc.pretty()))
if isinstance(loc, Ret) and len(call_stack) > 0:
@@ -227,7 +231,7 @@
program, call_stack[0], concise,
already_printed, call_stack[1:])]
if isinstance(loc, LoopStart):
- out += [(indent + child_indent, line)
+ out += [(indent + child_indent + 2, line)
for indent, line in self._pretty_lines(
program, loc.loop_start_pc, concise,
already_printed, call_stack)]
@@ -315,6 +319,9 @@
checks only for the innermost loop's end PC; if the loops are incorrectly
nested, it will not warn. However, it will raise a RuntimeError if the loop
stack size (8) is exceeded.
+
+ This function will not record cycles in the control graph with the Cycle
+ instance; run _fix_cycles afterward.
'''
LOOP_STACK_SIZE = 8
@@ -345,6 +352,8 @@
operands = program.get_operands(pc)
section = CodeSection(start_pc, pc)
locs = _get_next_control_locations(insn, operands, pc)
+ if start_pc in graph.graph:
+ raise RuntimeError
graph.graph[start_pc] = (section, locs)
# Recurse on next control-flow locations.
@@ -386,6 +395,47 @@
program.max_pc()), [ImemEnd()])
return
+def _label_cycles(program: OTBNProgram, graph: ControlGraph, start_pc: int, visited_pcs: Set[int]) -> None:
+ '''Creates Cycle edges to remove cyclic control flow from the graph.
+
+ Modifies graph in place. Works by replacing edges that loop back to an
+ already-traversed part of the control flow with a Cycle instance.
+ '''
+ sec, edges = graph.get_entry(start_pc)
+ for pc in sec:
+ visited_pcs.add(pc)
+ for i in range(len(edges)):
+ edge = edges[i]
+ if isinstance(edge, Ret) or isinstance(edge, ImemEnd) or isinstance(edge, Ecall):
+ # Cannot possibly loop back
+ continue
+ elif isinstance(edge, Cycle):
+ # Done; no need to traverse or replace
+ continue
+ if edge.pc in visited_pcs:
+ # Replace with Cycle instance and do not traverse
+ edges[i] = Cycle(edge.pc)
+ else:
+ _label_cycles(program, graph, edge.pc, visited_pcs.copy())
+
+
+def _fix_cycles(program: OTBNProgram, graph: ControlGraph) -> None:
+ '''Labels cycles and splits them from other CodeSections in the graph.
+
+ Modifies graph in place.
+ '''
+ _label_cycles(program, graph, graph.start, set())
+ cycle_start_pcs = graph.get_cycle_starts()
+ new_entries: Dict[int,Tuple[CodeSection,List[ControlLoc]]] = {}
+ for start_pc in graph.graph:
+ sec, edges = graph.get_entry(start_pc)
+ for pc in sec:
+ if pc in cycle_start_pcs and pc != sec.start:
+ # Split this section and create a new edge leading to the cycle
+ new_entries[start_pc] = (CodeSection(start_pc, pc), [ControlLoc(pc)])
+ break
+ graph.graph.update(new_entries)
+
def _make_control_graph(program: OTBNProgram, start_pc: int) -> ControlGraph:
'''Constructs a control flow graph with start_pc as the entrypoint.
@@ -394,6 +444,7 @@
'''
graph = ControlGraph(start_pc, {})
_populate_control_graph(graph, program, start_pc, [])
+ _fix_cycles(program, graph)
return graph
diff --git a/hw/ip/otbn/util/shared/information_flow.py b/hw/ip/otbn/util/shared/information_flow.py
index d72e884..b810ece 100644
--- a/hw/ip/otbn/util/shared/information_flow.py
+++ b/hw/ip/otbn/util/shared/information_flow.py
@@ -2,6 +2,7 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
+from copy import deepcopy
from typing import Dict, Iterable, List, Optional, Sequence, Set
from serialize.parse_helpers import check_keys, check_list, check_str
@@ -143,7 +144,7 @@
# Updating a nonexistent graph with another graph should return the
# other graph; since we need to modify self, we change this graph's
# flow to match other's.
- self.flow = other.flow.copy()
+ self.flow = deepcopy(other.flow)
self.exists = other.exists
return
@@ -201,6 +202,52 @@
return InformationFlowGraph(flow)
+ def __deepcopy__(self, memo):
+ flow = deepcopy(self.flow, memo)
+ return InformationFlowGraph(flow, self.exists)
+
+ def loop(self, max_iterations=1000) -> 'InformationFlowGraph':
+ '''Returns graph representing all possible repetitions of seq() of this
+ graph with itself.
+
+ The graph will be the combination of all possible chainings of this
+ graph with itself, including 0 chainings (i.e. an empty graph). The
+ result will be equivalent to:
+
+ g = InformationFlowGraph({})
+ for _ in range(infinity):
+ g.update(g.seq(g))
+
+ It is important to ensure that the total number of nodes in the set is
+ limited, otherwise the computation could infinite-loop; the computation
+ stops when the update becomes a no-op (signalling that the graph has
+ stabilized), which is only guaranteed to happen if the node set is
+ finite. By default, an error will be produced after a certain maximum
+ number of iterations, but the caller can override this by setting
+ `max_iterations` to None.
+
+ Returns a new graph; the input graph is unmodified.
+ '''
+ if not self.exists:
+ # Looping a nonexistent graph results in a nonexistent graph.
+ return InformationFlowGraph.nonexistent()
+
+ graph = InformationFlowGraph.empty()
+ ctr = 0
+ while (max_iterations is None or ctr < max_iterations):
+ old = deepcopy(graph)
+ graph.update(graph.seq(self))
+ if (old == graph):
+ # Graph has stabilized; further iterations will not change it.
+ return graph
+ ctr += 1
+
+ raise RuntimeError(
+ 'Maximum number of iterations ({}) exceeded when looping '
+ 'information-flow graph. Is the set of possible nodes larger '
+ 'than the maximum iterations?'
+ .format(max_iterations))
+
def pretty(self, indent: int = 0) -> str:
'''Return a human-readable representation of the graph.'''
if not self.exists:
@@ -221,6 +268,14 @@
sink))
return '\n'.join(lines)
+ def __eq__(self, other: 'InformationFlowGraph') -> bool:
+ '''Compare two information flow graphs for equality.
+
+ Two graphs are considered equal iff the underlying flow dictionaries
+ are equal.
+ '''
+ return self.flow == other.flow
+
class InsnInformationFlowNode:
'''Represents an information flow node whose value may depend on operands.
diff --git a/hw/ip/otbn/util/shared/information_flow_analysis.py b/hw/ip/otbn/util/shared/information_flow_analysis.py
index 10f5fdc..74bc1a0 100755
--- a/hw/ip/otbn/util/shared/information_flow_analysis.py
+++ b/hw/ip/otbn/util/shared/information_flow_analysis.py
@@ -3,6 +3,7 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
+from copy import deepcopy
from typing import Dict, List, Optional, Set, Tuple
from .cache import Cache, CacheEntry
@@ -25,14 +26,18 @@
# constants: known constants that are in common between all "return"
# iflow paths (i.e. the constants that a caller can always
# rely on)
+# cycles: TODO: document
# control deps: a dictionary whose keys are the information-flow nodes
# whose values at the start PC influence the control flow of
# the program; the value is a set of PCs of control-flow
# instructions through which the node influences the control
# flow
-IFlowResult = Tuple[Set[str], InformationFlowGraph, InformationFlowGraph,
- ConstantContext, Dict[str, Set[int]]]
-
+IFlowResult = Tuple[Set[str],
+ InformationFlowGraph,
+ InformationFlowGraph,
+ Optional[ConstantContext],
+ Dict[int,InformationFlowGraph],
+ Dict[str, Set[int]]]
class IFlowCacheEntry(CacheEntry[ConstantContext, IFlowResult]):
'''Represents an entry in the cache for _get_iflow.
@@ -45,10 +50,7 @@
should not change.
'''
def is_match(self, constants: ConstantContext) -> bool:
- for k, v in self.key.values.items():
- if constants.get(k) != v:
- return False
- return True
+ return constants.includes(self.key)
class IFlowCache(Cache[int, ConstantContext, IFlowResult]):
@@ -158,8 +160,7 @@
used_constants, insn_iflow = _build_iflow_insn(insn, op_vals, pc,
constants)
- for const in used_constants:
- constant_deps.update(iflow.sources(const))
+ constant_deps.update(iflow.sources_for_any(used_constants))
# Compose iflow with the information flow from this instruction
iflow = iflow.seq(insn_iflow)
@@ -191,13 +192,13 @@
result: IFlowResult, cache: IFlowCache) -> None:
'''Updates the cache for _get_iflow.'''
used_constants = result[0]
- used_constant_values = {}
+ used_constant_values = ConstantContext.empty()
for name in used_constants:
- assert name in constants
- used_constant_values[name] = constants.values[name]
+ value = constants.get(name)
+ assert value is not None
+ used_constant_values.set(name, value)
- cache.add(pc, IFlowCacheEntry(ConstantContext(used_constant_values),
- result))
+ cache.add(pc, IFlowCacheEntry(used_constant_values, result))
return
@@ -225,30 +226,53 @@
def _get_iflow_update_state(
- rec_result: IFlowResult, iflow: InformationFlowGraph,
- program_end_iflow: InformationFlowGraph, used_constants: Set[str],
+ rec_result: IFlowResult,
+ iflow: InformationFlowGraph,
+ program_end_iflow: InformationFlowGraph,
+ used_constants: Set[str],
+ constants: ConstantContext,
+ cycles: Dict[int,InformationFlowGraph],
control_deps: Dict[str, Set[int]]) -> InformationFlowGraph:
'''Update the internal state of _get_iflow after a recursive call.
- The `used_constants` and `control_deps` state elements are updated in
- place, but the new `program_end_iflow` is returned. The `iflow` input is
- not modified.
+ `iflow` is not modified.
+ `program_end_iflow` is updated in-place.
+ `used_constants` is updated in-place.
+ `constants` is updated in-place.
+ `cycles` is updated in-place.
+ `control_deps` is updated in-place.
+
+ The new information-flow graph for paths ending in a return to the caller
+ (return_iflow) is composed with `iflow` and returned. The caller will
+ likely need to adjust `iflow` using this value, but how varies by use case.
'''
- rec_used_constants, _, rec_program_end_iflow, _, rec_control_deps = rec_result
+ rec_used_constants, rec_return_iflow, rec_end_iflow, rec_constants, rec_cycles, rec_control_deps = rec_result
# Update the used constants and control-flow dependencies
- for const in rec_used_constants:
- used_constants.update(iflow.sources(const))
+ used_constants.update(iflow.sources_for_any(rec_used_constants))
_update_control_deps(control_deps, iflow, rec_control_deps)
+ # Update the cycles
+ for pc, cycle_iflow in rec_cycles.items():
+ cycles.setdefault(pc, InformationFlowGraph.nonexistent()).update(iflow.seq(cycle_iflow))
+
+ # Update the constants to keep only those that are either unmodified in the
+ # return-path information flow or returned by the recursive call.
+ constants.removemany(rec_return_iflow.all_sinks())
+ if rec_constants is not None:
+ constants.values.update(rec_constants.values)
+
# Update information flow results for paths where the program ends
- program_end_iflow.update(iflow.seq(rec_program_end_iflow))
+ program_end_iflow.update(iflow.seq(rec_end_iflow))
- return program_end_iflow
+ return iflow.seq(rec_return_iflow)
-def _get_iflow(program: OTBNProgram, graph: ControlGraph, start_pc: int,
- start_constants: ConstantContext, loop_end_pc: Optional[int],
+def _get_iflow(program: OTBNProgram,
+ graph: ControlGraph,
+ start_pc: int,
+ start_constants: ConstantContext,
+ loop_end_pc: Optional[int],
cache: IFlowCache) -> IFlowResult:
'''Gets the information-flow graphs for paths starting at start_pc.
@@ -266,7 +290,7 @@
if cached is not None:
return cached
- constants = start_constants.copy()
+ constants = deepcopy(start_constants)
# The combined information flow for all paths leading to the end of the
# subroutine (i.e. a RET, not counting RETS that happen after jumps within
@@ -282,6 +306,15 @@
# flow (and the PCs of the control-flow instructions they influence)
control_deps: Dict[str, Set[int]] = {}
+ # Cycle starts that are accessible from this PC.
+ cycles: Dict[int,InformationFlowGraph] = {}
+
+ # If this PC is the start of a cycle, then initialize the information flow
+ # for the cycle with an empty graph (since doing nothing is a valid
+ # traversal of the cycle).
+ if start_pc in graph.get_cycle_starts():
+ cycles[start_pc] = InformationFlowGraph.empty()
+
section = graph.get_section(start_pc)
edges = graph.get_edges(start_pc)
@@ -290,7 +323,6 @@
# end of the loop like a RET instruction.
if loop_end_pc is not None and loop_end_pc in section:
assert loop_end_pc == section.end
- loop_end_pc = None
edges = [Ret()]
# Get the information flow, used constants, and known constants at the end
@@ -309,8 +341,7 @@
last_insn, last_op_vals, section.end, constants)
# Update used constants to include last instruction
- for const in last_insn_used_constants:
- used_constants.update(iflow.sources(const))
+ used_constants.update(last_insn_iflow.sources_for_any(last_insn_used_constants))
# Update control_deps to include last instruction
last_insn_control_deps = {
@@ -323,49 +354,35 @@
iflow = iflow.seq(last_insn_iflow)
if last_insn.mnemonic in ['loopi', 'loop']:
- # Special handling for loops; reject non-constant #iterations, and run
- # loop body #iterations times
- iterations = _get_constant_loop_iterations(last_insn, last_op_vals,
- constants)
- if iterations is None:
- raise ValueError(
- 'LOOP instruction on a register that is not a '
- 'known constant at PC {:#x} (known constants: {}). If '
- 'the register is in fact a constant, you may need to '
- 'add constant-tracking support for more instructions.'.format(
- section.end, constants.values.keys()))
-
# A loop instruction should result in exactly one edge of type
# LoopStart; check that assumption before we rely on it
assert len(edges) == 1 and isinstance(edges[0], LoopStart)
body_loc = edges[0]
+ # Try to get the number of loop iterations.
+ iterations = _get_constant_loop_iterations(last_insn, last_op_vals,
+ constants)
+
# Update the constants to include the loop instruction
constants.update_insn(last_insn, last_op_vals)
- # Recursive calls for each iteration
- for _ in range(iterations):
- body_result = _get_iflow(program, graph, body_loc.loop_start_pc,
- constants, body_loc.loop_end_pc, cache)
+ if iterations is not None:
+ # If the number of iterations is constant, perform recursive calls
+ # for each iteration
+ for _ in range(iterations):
+ body_result = _get_iflow(program, graph, body_loc.loop_start_pc,
+ constants, body_loc.loop_end_pc, cache)
- # Update program_end_iflow, used constants, and control_deps
- program_end_iflow = _get_iflow_update_state(
- body_result, iflow, program_end_iflow, used_constants,
- control_deps)
+ iflow = _get_iflow_update_state(
+ body_result, iflow, program_end_iflow, used_constants, constants, cycles, control_deps)
- # Update constants and get information flow for paths that loop
- # back to the start ("return")
- _, body_return_iflow, _, constants, _ = body_result
+ # Set the next edges to the instruction after the loop ends
+ edges = [ControlLoc(body_loc.loop_end_pc + 4)]
- # Compose current iflow with the flow for paths that hit the end of
- # the loop
- iflow = iflow.seq(body_return_iflow)
-
- # Set the next edges to the instruction after the loop ends
- edges = [ControlLoc(body_loc.loop_end_pc + 4)]
elif last_insn.mnemonic == 'jal' and last_op_vals['grd'] == 1:
# Special handling for jumps; recursive call for jump destination, then
# continue at pc+4
+ constants.update_insn(last_insn, last_op_vals)
# Jumps should produce exactly one non-special edge; check that
# assumption before we rely on it
@@ -373,15 +390,15 @@
jump_loc = edges[0]
jump_result = _get_iflow(program, graph, jump_loc.pc, constants, None,
cache)
-
- # Update program_end_iflow, used constants, and control_deps
- program_end_iflow = _get_iflow_update_state(jump_result, iflow,
+ iflow = _get_iflow_update_state(jump_result, iflow,
program_end_iflow,
used_constants,
+ constants,
+ cycles,
control_deps)
- # Update constants and get information flow for return paths
- _, jump_return_iflow, _, constants, _ = jump_result
+ # Get information flow for return paths
+ _, jump_return_iflow, _, _, _, _ = jump_result
# Compose current iflow with the flow for the jump's return paths
iflow = iflow.seq(jump_return_iflow)
@@ -399,11 +416,9 @@
if isinstance(loc, Ecall) or isinstance(loc, ImemEnd):
# Ecall or ImemEnd nodes are expected to be the only edge
assert len(edges) == 1
- # Clear common constants; there are no return branches here
- common_constants = ConstantContext.empty()
program_end_iflow.update(iflow)
elif isinstance(loc, Ret):
- if loop_end_pc is not None:
+ if loop_end_pc is not None and loop_end_pc != section.end:
raise RuntimeError(
'RET before end of loop at PC {:#x} (loop end PC: '
'{:#x})'.format(section.end, loop_end_pc))
@@ -412,46 +427,80 @@
# Since this is the only edge, common_constants must be unset
common_constants = constants
return_iflow.update(iflow)
- elif isinstance(loc, LoopStart) or isinstance(loc, LoopEnd):
- # We shouldn't hit a loop instances here; those cases (a loop
- # instruction or the end of a loop) are all handled earlier
- raise RuntimeError(
- 'Unexpected loop edge (type {}) at PC {:#x}'.format(
- type(loc), section.end))
- elif not loc.is_special():
+ elif isinstance(loc, Cycle):
+ # Add the flow from start PC to this cyclic PC to the existing
+ # flow, if any.
+ cycles.setdefault(loc.pc, InformationFlowGraph.nonexistent()).update(iflow)
+ elif isinstance(loc, LoopStart) or not loc.is_special():
# Just a normal PC; recurse
result = _get_iflow(program, graph, loc.pc, constants, loop_end_pc,
cache)
- # Update program_end_iflow, used constants, and control_deps
- program_end_iflow = _get_iflow_update_state(
- result, iflow, program_end_iflow, used_constants, control_deps)
-
- # Get information flow for return paths and new constants
- _, rec_return_iflow, _, rec_constants, _ = result
+ # Defensively copy constants so they don't cross between branches
+ local_constants = deepcopy(constants)
+ rec_return_iflow = _get_iflow_update_state(
+ result, iflow, program_end_iflow, used_constants, local_constants, cycles, control_deps)
# Take values on which existing and recursive constants agree
if common_constants is None:
- common_constants = rec_constants
+ common_constants = local_constants
else:
- common_constants.intersect(rec_constants)
+ common_constants.intersect(local_constants)
- # Update return_iflow with the current iflow composed with return
- # paths
- return_iflow.update(iflow.seq(rec_return_iflow))
+ # Update return_iflow with the current iflow composed with return
+ # paths
+ return_iflow.update(rec_return_iflow)
else:
raise RuntimeError(
'Unexpected next control location type at PC {:#x}: {}'.format(
section.end, type(loc)))
- # There should be at least one edge, and all edges should set
- # common_constants to some non-None value
- assert common_constants is not None
-
# Update used_constants to include any constant dependencies of
# common_constants, since common_constants will be cached
- used_constants.update(
- return_iflow.sources_for_any(common_constants.values.keys()))
+ if common_constants is not None:
+ used_constants.update(return_iflow.sources_for_any(common_constants.values.keys()))
+
+ # If this PC is the start of one of the cycles we're currently processing,
+ # see if it can be finalized.
+ if start_pc in cycles:
+ cycle_iflow = cycles[start_pc]
+
+ # Find which constants are "stable" (unmodified throughout all paths in
+ # the cycle)
+ stable_constants = ConstantContext.empty()
+ for k, v in start_constants.values.items():
+ if k not in cycle_iflow.all_sinks():
+ stable_constants.set(k, v)
+
+ # If any start constants were modified during the cycle; do a recursive
+ # call with only the unmodified constants, and return that.
+ if not stable_constants.includes(start_constants):
+ return _get_iflow(program, graph, start_pc, stable_constants, loop_end_pc, cache)
+
+ # If all starting constants were stable, just loop() the information
+ # flow graph for this cycle to get the combined flow for all paths that
+ # cycle back to this point (including no cycling, i.e. the empty graph).
+ cycle_iflow = cycle_iflow.loop()
+
+ # The final information flow for all paths is the graph of any valid
+ # traversals of the cycle, sequentially composed with the return or
+ # end-program paths from here.
+ return_iflow = cycle_iflow.seq(return_iflow)
+ program_end_iflow = cycle_iflow.seq(program_end_iflow)
+ for pc in cycles:
+ if pc == start_pc:
+ continue
+ cycles[pc] = cycle_iflow.seq(cycles[pc])
+
+ # Control-flow dependencies must also be updated to include the
+ # dependencies stemming from any valid traversal of the cycle
+ new_control_deps = {}
+ _update_control_deps(new_control_deps, cycle_iflow, control_deps)
+ control_deps = new_control_deps
+
+
+ # Remove this cycle from the cycles dict, since it is now resolved.
+ del cycles[start_pc]
# Strip special register x0 from both sources and sinks of graphs returned.
return_iflow.remove_source('x0')
@@ -461,32 +510,12 @@
control_deps.pop('x0', None)
# Update the cache and return
- out = (used_constants, return_iflow, program_end_iflow, common_constants,
+ out = (used_constants, return_iflow, program_end_iflow, common_constants, cycles,
control_deps)
_get_iflow_cache_update(start_pc, start_constants, out, cache)
return out
-def check_acyclic(graph: ControlGraph) -> None:
- '''Checks for (non LOOP/LOOPI) cycles in control-flow graph.
-
- If there are such cycles, we need to raise an error and not proceed; the
- control-flow traversal would infinite-loop.
- '''
- cycles = graph.get_cycles(graph.start)
- if cycles:
- msg = [
- 'One or more cycles found in control-flow graph at the following PCs:'
- ]
- for pc, links in cycles.items():
- msg.append('{:#x} <-> {}'.format(
- pc, ','.join(['{:#x}'.format(link) for link in links])))
- msg.append('Analyzing cyclic control flow outside of LOOP/LOOPI '
- 'instructions is not currently supported.')
- raise ValueError('\n'.join(msg))
- return
-
-
def get_subroutine_iflow(program: OTBNProgram, graph: ControlGraph,
subroutine_name: str) -> SubroutineIFlow:
'''Gets the information-flow graphs for the subroutine.
@@ -500,10 +529,14 @@
3. The information-flow nodes whose values at the start of the subroutine
influence its control flow.
'''
- check_acyclic(graph)
start_pc = program.get_pc_at_symbol(subroutine_name)
- _, ret_iflow, end_iflow, _, control_deps = _get_iflow(
+ _, ret_iflow, end_iflow, _, cycles, control_deps = _get_iflow(
program, graph, start_pc, ConstantContext.empty(), None, IFlowCache())
+ if cycles:
+ for pc in cycles:
+ print(cycles[pc].pretty())
+ print('---')
+ raise RuntimeError('Unresolved cycles; start PCs: {}'.format(', '.join(['{:#x}'.format(pc) for pc in cycles.keys()])))
if not (ret_iflow.exists or end_iflow.exists):
raise ValueError('Could not find any complete control-flow paths when '
'analyzing subroutine.')
@@ -520,10 +553,10 @@
2. The information-flow nodes whose values at the start of the subroutine
influence its control flow.
'''
- check_acyclic(graph)
- _, ret_iflow, end_iflow, _, control_deps = _get_iflow(
- program, graph, program.min_pc(), ConstantContext.empty(), None,
- IFlowCache())
+ _, ret_iflow, end_iflow, _, cycles, control_deps = _get_iflow(
+ program, graph, program.min_pc(), ConstantContext.empty(), None, IFlowCache())
+ if cycles:
+ raise RuntimeError('Unresolved cycles; start PCs: {}'.format(', '.join(['{:#x}'.format(k) for k in cycles.keys()])))
if ret_iflow.exists:
# No paths from imem_start should end in RET
raise ValueError('Unexpected information flow for paths ending in RET '