| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| from typing import Dict, Iterator, List, Optional, Sequence, Set |
| |
| from serialize.parse_helpers import check_keys, check_list, check_str |
| |
| from .operand import Operand, RegOperandType |
| |
| FLAG_NAMES = ['c', 'm', 'l', 'z'] |
| SPECIAL_REG_NAMES = ['mod', 'acc'] |
| READONLY = ['x0'] |
| |
| class InformationFlowGraph: |
| '''Represents an information flow graph. |
| |
| The graph is represented as a `flow` dictionary mapping each "sink" node to |
| a set of "source" nodes. Nodes are represented as strings. All nodes which |
| are modified at all in the information flow should be represented as sinks |
| in the `flow` dictionary. If a node does not appear, that means that it is |
| unmodified. |
| |
| Note that it is possible for a sink node to be mapped to an empty set; that |
| means the sink is overwritten with a constant value, and information is not |
| flowing to it from any nodes (including its own previous value). |
| ''' |
| def __init__(self, flow: Dict[str, Set[str]], exists:bool=True): |
| self.flow = flow |
| |
| # Should not be modified directly. See the nonexistent() method |
| # documentation for details of what this flag means. |
| self.exists = exists |
| |
| @staticmethod |
| def empty() -> 'InformationFlowGraph': |
| '''Represents the graph for a path in which nothing is modified. |
| |
| For instance, if a block of code could be executed or not depending on |
| the processor state, then the path where the block is not executed |
| would be an empty() graph. Then, one could use update() to get a graph |
| representing the combination of the two possibilities. |
| ''' |
| return InformationFlowGraph({}) |
| |
| @staticmethod |
| def nonexistent() -> 'InformationFlowGraph': |
| '''Represents the graph for a nonexistent path. |
| |
| There is an important distinction between this and an "empty" graph. In |
| particular, for any graph G, G.update(nonexistent) = G, which is not |
| the case for a merely "empty" graph; since the update() method is |
| combining all possible paths, an empty graph means we need to consider |
| that all nodes might be unmodified, while a nonexistent graph has no |
| possible paths and therefore no effect. |
| |
| A nonexistent graph can be thought of as "None", but handling it |
| directly within the class reduces the need for None checks. |
| |
| For instance, imagine we want to represent the information flow for |
| only paths of a program that end in RET. If no paths from the current |
| point end in RET (because, for instance, all paths end the program with |
| ECALL), then a nonexistent graph would represent the information flow. |
| ''' |
| return InformationFlowGraph({}, False) |
| |
| def sources(self, sink: str) -> Set[str]: |
| '''Returns all sources for the given sink.''' |
| # if the sink does not appear, it is unmodified, meaning its only |
| # source is itself |
| return self.flow.get(sink, {sink}) |
| |
| def sinks(self, source: str) -> Set[str]: |
| '''Returns all sinks for the given source.''' |
| out = set() |
| for sink in self.flow: |
| if source in self.flow[sink]: |
| out.add(sink) |
| if source not in self.flow: |
| # Implicitly, the source is unmodified and depends on itself |
| out.add(source) |
| return out |
| |
| def all_sources(self) -> Set[str]: |
| '''Returns all sources in the graph.''' |
| out : Set[str] = set() |
| return out.union(*self.flow.values()) |
| |
| def all_sinks(self) -> Set[str]: |
| '''Returns all sinks in the graph.''' |
| return set(self.flow.keys()) |
| |
| def sources_for_any(self, sinks: Iterator[str]) -> Set[str]: |
| '''Returns all nodes that are a source for any of the given sinks.''' |
| out : Set[str] = set() |
| return out.union(*(self.sources(s) for s in sinks)) |
| |
| def sinks_for_any(self, sinks: Iterator[str]) -> Set[str]: |
| '''Returns all nodes that are a sink for any of the given sources.''' |
| out : Set[str] = set() |
| return out.union(*(self.sinks(s) for s in sinks)) |
| |
| def update(self, other: 'InformationFlowGraph') -> None: |
| '''Updates self to include the information flow from other. |
| |
| Every sink that appears in either or both graphs will, in the updated |
| self, have sources formed from the union of its sources in both graphs. |
| Sinks that appear in only one graph will have sources formed of the |
| union of their sources in the graph in which they appear and themselves |
| (because a sink not appearing in a graph implicitly means the value is |
| unchanged). |
| |
| Important note: updating with an empty graph will not be a no-op. An |
| empty graph implies everything remains unmodified, so combining an |
| empty graph with something that *overwrites* a value (i.e. a graph with |
| a sink whose sources do not include itself) will add the "value doesn't |
| change" information flow (i.e. the sink will be added to its own |
| sources). |
| |
| Does not modify other. |
| ''' |
| if not other.exists: |
| # If the other graph is nonexistent, then this is a no-op. |
| return |
| |
| if not self.exists: |
| # Updating a nonexistent graph with another graph should return the |
| # other graph; since we need to modify self, we change this graph's |
| # flow to match other's. |
| self.flow = other.flow.copy() |
| self.exists = other.exists |
| return |
| |
| for sink, sources in other.flow.items(): |
| if sink not in self.flow: |
| # implicitly, a non-updated value depends only on itself (NOT |
| # an empty set, which would indicate a value that is |
| # overwritten with a constant) |
| self.flow[sink] = {sink} |
| self.flow[sink].update(sources) |
| |
| return |
| |
| def seq(self, other: 'InformationFlowGraph') -> 'InformationFlowGraph': |
| '''Performs sequential composition of information flow graphs. |
| |
| Returns the information flow graph that results from self sequentially |
| composed with other. For example, if these are the two graphs (b and c |
| being the sinks of self and e and d being the sinks of other): |
| |
| self: other: |
| a,d -> b b -> e |
| a,b -> c f -> d |
| b,c -> c |
| |
| ...then the result of this operation will be: |
| |
| a,d -> e |
| f -> d |
| a,b,d -> c |
| |
| Defensively copies all source sets for the new graph. |
| ''' |
| if not self.exists or not other.exists: |
| # If either this or the other graph is nonexistent, then the |
| # sequence is nonexistent. |
| return InformationFlowGraph.nonexistent() |
| |
| flow = {} |
| for sink, sources in other.flow.items(): |
| new_sources = set() |
| for source in sources: |
| if source in self.flow: |
| new_sources.update(self.flow[source]) |
| else: |
| # source is not a sink in self's flow; assume it stays |
| # constant |
| new_sources.add(source) |
| flow[sink] = new_sources |
| |
| for sink, sources in self.flow.items(): |
| if sink not in flow: |
| # sink is not updated in other's flow |
| flow[sink] = sources.copy() |
| |
| return InformationFlowGraph(flow) |
| |
| def pretty(self, indent: int = 0) -> str: |
| '''Return a human-readable representation of the graph.''' |
| if not self.exists: |
| return 'Nonexistent information-flow graph (no possible paths).' |
| |
| prefix = ' ' * indent |
| flow_strings = { |
| sink: ','.join(sorted(sources)) |
| for sink, sources in self.flow.items() |
| } |
| max_source_chars = max([len(s) for s in flow_strings.values()], default=0) |
| lines = [] |
| for sink in sorted(self.flow.keys()): |
| sources_str = flow_strings[sink] |
| padding = ' ' * (max_source_chars - len(sources_str)) |
| lines.append('{}{}{} -> {}'.format(prefix, sources_str, padding, |
| sink)) |
| return '\n'.join(lines) |
| |
| class InsnInformationFlowNode: |
| '''Represents an information flow node whose value may depend on operands. |
| ''' |
| def required_constants(self, op_vals: Dict[str, int]) -> Set[str]: |
| '''Returns the names of regs that must be constant for `evaluate()`. |
| |
| For instance, for an indirect reference of a WDR via a GPR, the GPR's |
| value must be constant for the node to be evaluated. Subclasses that |
| require constants override this method. |
| ''' |
| return set() |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> str: |
| '''Determines information flow graph for the instruction. |
| |
| Evaluates the information-flow node according to the given operand |
| values. The `constant_regs` dictionary is only used to access values in |
| `self.required_constants()`; changes to other values will have no |
| effect. |
| ''' |
| raise NotImplementedError() |
| |
| |
| class InsnRegOperandNode(InsnInformationFlowNode): |
| '''Represents a specific register operand for an instruction.''' |
| def __init__(self, op: Operand): |
| if not isinstance(op.op_type, RegOperandType): |
| raise ValueError( |
| 'Attempt to construct a register-operand information flow ' |
| 'node from a non-register operand {} of type {}'.format( |
| op.name, op.op_type)) |
| self.op = op |
| self.is_wide = op.op_type.reg_type == 'wdr' |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> str: |
| if self.op.name not in op_vals: |
| raise ValueError( |
| 'Operand {} not found in provided operand values: {}'.format( |
| self.op.name, op_vals.keys())) |
| return self.op.op_type.op_val_to_str(op_vals[self.op.name], None) |
| |
| |
| def _eval_indirect_wdr(gpr: str, constant_regs: Dict[str, int]) -> str: |
| if gpr not in constant_regs: |
| raise RuntimeError( |
| 'Cannot analyze information flow; cannot determine which ' |
| 'WDR is referenced by non-constant GPR {} (constant regs ' |
| 'are: {})'.format(gpr, constant_regs.keys())) |
| wdr_idx = constant_regs[gpr] |
| return 'w{}'.format(wdr_idx) |
| |
| |
| class InsnIndirectWDRNode(InsnInformationFlowNode): |
| '''Represents an indirect reference to a WDR via a GPR.''' |
| def __init__(self, op: Operand): |
| if not isinstance(op.op_type, RegOperandType): |
| raise ValueError( |
| 'Attempt to construct an indirect-WDR operand information ' |
| 'flow node from a non-register operand {} of type {}'.format( |
| op.name, op.op_type)) |
| if not op.op_type.reg_type == 'gpr': |
| raise ValueError( |
| 'Attempt to construct an indirect-WDR operand information ' |
| 'flow node from a non-GPR register operand {} of type {}'. |
| format(op.name, op.op_type.reg_type)) |
| self.op = op |
| |
| def _get_gpr_name(self, op_vals: Dict[str, int]) -> str: |
| if self.op.name not in op_vals: |
| raise ValueError( |
| 'Operand {} not found in provided operand values: {}'.format( |
| self.op.name, op_vals.keys())) |
| return self.op.op_type.op_val_to_str(op_vals[self.op.name], None) |
| |
| def required_constants(self, op_vals: Dict[str, int]) -> Set[str]: |
| return {self._get_gpr_name(op_vals)} |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> str: |
| gpr = self._get_gpr_name(op_vals) |
| return _eval_indirect_wdr(gpr, constant_regs) |
| |
| |
| class InsnGroupFlagNode(InsnInformationFlowNode): |
| '''Represents a flag node whose group depends on the flag_group operand.''' |
| def __init__(self, flag: str): |
| flag = flag.lower() |
| if flag not in FLAG_NAMES: |
| raise ValueError( |
| 'Invalid flag name: "{}". Valid names are: {}'.format( |
| flag, FLAG_NAMES)) |
| self.flag = flag |
| self.constant_dependent = False |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> str: |
| if 'flag_group' not in op_vals: |
| raise ValueError( |
| 'Operand flag_group not found in provided operand values: {}'. |
| format(op_vals.keys())) |
| return 'fg{}-{}'.format(op_vals['flag_group'], self.flag) |
| |
| |
| class InsnConstantNode(InsnInformationFlowNode): |
| '''Represents instruction node whose value does not depend on operands.''' |
| def __init__(self, node: str): |
| self.node = node |
| self.constant_dependent = False |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> str: |
| return self.node |
| |
| |
| def _parse_iflow_nodes( |
| node: str, what: str, |
| operands: List[Operand]) -> List[InsnInformationFlowNode]: |
| '''Parses information flow node(s) from the instruction description. |
| |
| Valid information flow nodes are one of the following: |
| - the name of a *register* operand from the operands list |
| - "wref-<reg operand name>" for instructions where a WDR is indirectly |
| accessed via a GPR; in this case <reg operand name> is the GPR and must |
| be in the operands list |
| - one of the special strings "dmem", "acc", or "mod" |
| - a flag (represented as "<flag group>-<flag>", where <flag group> can |
| be either "fg0", "fg1", or (if the instruction has a flag_group |
| operand) simply "flags" to select the current flag group, and <flag> |
| can be l, c, m, z, or "all". |
| |
| Raises a ValueError if the node does not have a valid format. |
| ''' |
| node = node.lower() |
| |
| # Check if node is a register operand |
| for op in operands: |
| if node == op.name: |
| if not isinstance(op.op_type, RegOperandType): |
| raise RuntimeError( |
| 'Information-flow node {} matches operand name, but ' |
| 'operand type is not a register (type {}). Note that ' |
| 'immediate operands cannot be information-flow nodes.' |
| .format(node, type(op.op_type))) |
| return [InsnRegOperandNode(op)] |
| |
| # Check if node is an indirect reference to a WDR through a GPR |
| if node.startswith('wref-'): |
| gpr = node[len('wref-'):] |
| for op in operands: |
| if gpr == op.name: |
| if not (isinstance(op.op_type, RegOperandType) and op.op_type.reg_type == 'gpr'): |
| raise RuntimeError( |
| 'Operand {} in indirect reference {} is not a GPR ' |
| '(type {}). Only GPRs can be indirect references.' |
| .format(gpr, node, type(op.op_type))) |
| return [InsnIndirectWDRNode(op)] |
| raise RuntimeError( |
| 'Could not find GPR operand corresponding to {} when decoding ' |
| 'indirect reference {}. Operand names: {}' |
| .format(gpr, node, ', '.join([op.name for op in operands]))) |
| |
| # Check if node is a special string |
| if node == 'dmem' or node in SPECIAL_REG_NAMES: |
| return [InsnConstantNode(node)] |
| |
| # Try to interpret node as a flag or set of flags |
| node_split = node.split('-') |
| if len(node_split) != 2: |
| raise ValueError('Cannot parse information flow node {} for {}'.format( |
| node, what)) |
| flag_group_str, flag = node_split |
| |
| # Case where flag group depends on the flag_group operand |
| if flag_group_str == 'flags': |
| if flag == 'all': |
| return [InsnGroupFlagNode(flag) for flag in FLAG_NAMES] |
| return [InsnGroupFlagNode(flag)] |
| elif flag_group_str == 'fg0' or flag_group_str == 'fg1': |
| if flag == 'all': |
| return [ |
| InsnConstantNode('{}-{}'.format(flag_group_str, flag)) |
| for flag in FLAG_NAMES |
| ] |
| return [InsnConstantNode('{}-{}'.format(flag_group_str, flag))] |
| else: |
| raise ValueError('Cannot parse information flow node {} for {}'.format( |
| node, what)) |
| |
| |
| class InsnInformationFlowTest: |
| '''Wrapper class for tests for information-flow rules.''' |
| def __init__(self, as_string: str, operand: str, value: int): |
| self.as_string = as_string |
| self.operand = operand |
| self.value = value |
| |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| # Should be overridden by subclasses |
| raise NotImplementedError() |
| |
| def __repr__(self) -> str: |
| return 'InsnInformationFlowTest: {}'.format(self.as_string) |
| |
| |
| class EqTest(InsnInformationFlowTest): |
| '''Tests that the operand is equal to the value.''' |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| return op_vals[self.operand] == self.value |
| |
| |
| class NotEqTest(InsnInformationFlowTest): |
| '''Tests that the operand is not equal to the value.''' |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| return not (op_vals[self.operand] == self.value) |
| |
| |
| class GeqTest(InsnInformationFlowTest): |
| '''Tests that the operand is greater than or equal to the value.''' |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| return op_vals[self.operand] >= self.value |
| |
| |
| class LeqTest(InsnInformationFlowTest): |
| '''Tests that the operand is less than or equal to the value.''' |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| return op_vals[self.operand] <= self.value |
| |
| |
| class MultiTest(InsnInformationFlowTest): |
| '''Combination of multiple tests.''' |
| def __init__(self, tests: List[InsnInformationFlowTest]): |
| self.tests = tests |
| |
| def check(self, op_vals: Dict[str, int]) -> bool: |
| return all(t.check(op_vals) for t in self.tests) |
| |
| |
| def _parse_iflow_test(test_yml: object, what: str, |
| operands: List[Operand]) -> InsnInformationFlowTest: |
| '''Parses an item in the "test" list of a YAML information-flow rule. |
| |
| Tests are expected to be strings of the form "<operand> <comparison> |
| <value>", where: |
| - <operand> is the name of one of the instruction's operands |
| - <comparison> is "==", "!=", ">=", or "<=" |
| - <value> is an integer that is within the allowed range of values |
| for this operand (for instance, for a flag group it would be 0 or 1, |
| and for a register it would be between 0 and 31). |
| |
| Returns a function that takes operand values as input and returns true if |
| the test passes. |
| ''' |
| test = check_str(test_yml, what) |
| test = test.lower() |
| test_split = test.split(' ') |
| if len(test_split) != 3: |
| raise ValueError( |
| 'Invalid information flow test format (expected "<operand> ' |
| '<comparison> <value>"): got {} (for {})'.format(test, what)) |
| opname, comparison, value_str = test_split |
| |
| opnames = [op.name for op in operands] |
| if opname not in opnames: |
| raise ValueError( |
| 'Invalid information flow test format for {}: operand {} not ' |
| 'found in operands list: {}'.format(what, opname, opnames)) |
| |
| try: |
| value = int(value_str, 0) |
| except ValueError: |
| raise ValueError( |
| 'Value {} in test {} for {} must be an integer.'.format( |
| value_str, test, what)) |
| |
| constructors = { |
| '==' : EqTest, |
| '!=' : NotEqTest, |
| '>=' : GeqTest, |
| '<=' : LeqTest, |
| } |
| constructor = constructors.get(comparison, None) |
| if constructor is None: |
| raise ValueError('Unrecognized comparison {} for {}'.format( |
| comparison, what)) |
| return constructor(test, opname, value) |
| |
| |
| class InsnInformationFlowRule: |
| '''A rule describing a single step of information flowing to one or more |
| nodes from one or more nodes.''' |
| def __init__(self, flows_to: Sequence[InsnInformationFlowNode], |
| flows_from: Sequence[InsnInformationFlowNode], |
| test: InsnInformationFlowTest): |
| self.flows_to = flows_to |
| self.flows_from = flows_from |
| self.test = test |
| |
| def required_constants(self, op_vals: Dict[str, int]) -> Set[str]: |
| '''Returns the names of regs that must be constant for `evaluate()`.''' |
| out = set() |
| for node in self.flows_to: |
| out.update(node.required_constants(op_vals)) |
| for node in self.flows_from: |
| out.update(node.required_constants(op_vals)) |
| return out |
| |
| def evaluate( |
| self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> InformationFlowGraph: |
| if not self.test.check(op_vals): |
| # Rule is not triggered |
| return InformationFlowGraph.nonexistent() |
| sources = set() |
| for node in self.flows_from: |
| sources.add(node.evaluate(op_vals, constant_regs)) |
| flow = {} |
| for node in self.flows_to: |
| dest = node.evaluate(op_vals, constant_regs) |
| if dest in READONLY: |
| # No information will actually flow to this node, because it is |
| # not writeable; skip. |
| continue |
| flow[dest] = sources.copy() |
| return InformationFlowGraph(flow) |
| |
| @staticmethod |
| def from_yaml(yml: object, what: str, |
| insn_operands: List[Operand]) -> 'InsnInformationFlowRule': |
| rule = check_keys(yml, what, ['to', 'from'], ['test']) |
| |
| to_what = '"to" list for {}'.format(what) |
| flows_to = [] |
| for node_yml in check_list(rule['to'], to_what): |
| node_what = 'node {} in {}'.format(node_yml, to_what) |
| nodes = _parse_iflow_nodes(check_str(node_yml, node_what), |
| node_what, insn_operands) |
| flows_to.extend(nodes) |
| |
| from_what = '"from" list for {}'.format(what) |
| flows_from = [] |
| for node_yml in check_list(rule['from'], from_what): |
| node_what = 'node {} in {}'.format(node_yml, from_what) |
| nodes = _parse_iflow_nodes(check_str(node_yml, node_what), |
| node_what, insn_operands) |
| flows_from.extend(nodes) |
| |
| test_yml = rule.get('test', None) |
| if test_yml is None: |
| tests = [] |
| else: |
| test_what = 'test field for {}'.format(what) |
| tests = [ |
| _parse_iflow_test(t, test_what, insn_operands) |
| for t in check_list(test_yml, test_what) |
| ] |
| return InsnInformationFlowRule(flows_to, flows_from, MultiTest(tests)) |
| |
| |
| class InsnInformationFlow: |
| '''Represents information flow for a single instruction.''' |
| def __init__(self, rules: List[InsnInformationFlowRule]) -> None: |
| self.rules = rules |
| |
| def required_constants(self, op_vals: Dict[str, int]) -> Set[str]: |
| '''Returns the names of regs that must be constant for `evaluate()`.''' |
| return { |
| const |
| for rule in self.rules |
| for const in rule.required_constants(op_vals) |
| } |
| |
| def evaluate(self, op_vals: Dict[str, int], |
| constant_regs: Dict[str, int]) -> InformationFlowGraph: |
| graph = InformationFlowGraph.nonexistent() |
| for rule in self.rules: |
| rule_graph = rule.evaluate(op_vals, constant_regs) |
| graph.update(rule_graph) |
| |
| return graph |
| |
| @staticmethod |
| def from_yaml(yml: Optional[object], what: str, |
| insn_operands: List[Operand]) -> 'InsnInformationFlow': |
| |
| if yml is None: |
| # Default behavior: all source registers flow to all destination |
| # registers, no other information flow or special registers |
| sources = [ |
| InsnRegOperandNode(op) for op in insn_operands if |
| isinstance(op.op_type, RegOperandType) and op.op_type.is_src() |
| ] |
| dests = [ |
| InsnRegOperandNode(op) for op in insn_operands |
| if isinstance(op.op_type, RegOperandType) and |
| op.op_type.is_dest() |
| ] |
| return InsnInformationFlow( |
| [InsnInformationFlowRule(dests, sources, MultiTest([]))]) |
| |
| rule_list = check_list(yml, what) |
| rules = [] |
| for idx, rule_yml in enumerate(rule_list): |
| rule_what = 'rule at index {} for {}'.format(idx, what) |
| rule = InsnInformationFlowRule.from_yaml(rule_yml, rule_what, |
| insn_operands) |
| rules.append(rule) |
| return InsnInformationFlow(rules) |