| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| """DIF Status Report Generator. |
| |
| This tool generates a status report for the DIFs by cross referencing the git |
| commit history of each DIF with that of the HW it actuates to provide OpenTitan |
| developers with information about what DIFs require updating. |
| |
| To display usage run: |
| ./check_dif_statuses.py --help |
| """ |
| |
| import argparse |
| import collections |
| import io |
| import itertools |
| import json |
| import logging |
| import re |
| import subprocess |
| import sys |
| from contextlib import redirect_stdout |
| from enum import Enum |
| from pathlib import Path |
| from typing import List |
| |
| import enlighten |
| import gitfame |
| import hjson |
| import pydriller |
| from tabulate import tabulate |
| from termcolor import colored |
| |
| import topgen.lib as lib |
| |
| # Maintain a list of IPs that only exist in the top-level area. |
| # |
| # Note that there are several templated IPs that are auto-generated in the |
| # top-level area as well, but since the bulk of the code (including the |
| # template) lives in the hw/ip area, we do not need to consider them. |
| # These IPs are slowly being migrated to use the `ipgen` tooling, and are |
| # defined in the IPS_USING_IPGEN list in the make_new_dif.ip module imported |
| # above. |
| _TOP_LEVEL_IPS = {"ast", "sensor_ctrl"} |
| |
| # Indicates that the DIF work has not yet started. |
| _NOT_STARTED = colored("NOT STARTED", "red") |
| |
| # This file is $REPO_TOP/util/check_dif_statuses.py, so it takes two parent() |
| # calls to get back to the top. |
| REPO_TOP = Path(__file__).resolve().parent.parent |
| |
| # Define the DIF library relative to REPO_TOP. |
| DIFS_RELATIVE_PATH = Path("sw/device/lib/dif") |
| |
| |
| class _OTComponent(Enum): |
| """Type of OpenTitan component.""" |
| DIF = 1 |
| HW = 2 |
| |
| |
| class DIFStatus: |
| """Holds all DIF status information for displaying. |
| |
| Attributes: |
| dif_name (str): Full name of the DIF including the IP name. |
| ip (str): Name of the IP the DIF is associated with. |
| dif_path (Path): Path to the DIF code (relative to REPO_TOP). |
| hw_path (Path): Path to the HW RTL associated with this DIF. |
| dif_last_modified (datetime): Date and time the DIF was last modified. |
| hw_last_modified (datetime): Date and time the HW was last modified. |
| dif_main_contributors (List[str]): List of emails of DIF contributors. |
| hw_main_constributors (List[str]): List of emails of HW contributors. |
| lifecycle_state (str): Lifecycle state string (e.g., S0, S1, ...). |
| num_functions_defined (int): Number of API functions defined. |
| num_functions_implemented (int): Number of API functions implemented. |
| api_complete (bool): Indicates if DIF implements all defined functions. |
| funcs_unimplemented (Set[str]): Set of unimplemted DIF functions. |
| |
| """ |
| def __init__(self, ipgen_ips, top_level, dif_name): |
| """Mines metadata to populate this DIFStatus object. |
| |
| Args: |
| ipgen_ips: List of IPs generated with the ipgen.py tool. |
| top_level: Name of the top level design. |
| dif_name: Full name of the DIF including the IP name. |
| |
| Raises: |
| ValueError: Raised if DIF name does not start with "dif_". |
| """ |
| # Get DIF/IP names and path. |
| if not dif_name.startswith("dif_"): |
| raise ValueError("DIF name should start with \"dif_\".") |
| self.dif_name = dif_name |
| self.ip = self.dif_name[4:] |
| self.dif_path = DIFS_RELATIVE_PATH / dif_name |
| self.dif_autogen_path = (DIFS_RELATIVE_PATH / |
| f"autogen/{dif_name}_autogen") |
| |
| # Check if header file exists - if not then its not even begun. |
| has_started = self.dif_path.with_suffix(".h").is_file() |
| |
| # Get (relative) HW RTL path. |
| if self.ip in ipgen_ips: |
| self.hw_path = Path(f"hw/{top_level}/ip_autogen/{self.ip}") |
| elif self.ip in _TOP_LEVEL_IPS: |
| self.hw_path = Path(f"hw/{top_level}/ip/{self.ip}") |
| else: |
| self.hw_path = Path(f"hw/ip/{self.ip}") |
| |
| # Indicates DIF API completeness. |
| self.num_functions_defined = -1 |
| self.num_functions_implemented = -1 |
| self.api_complete = False |
| |
| # Determine last date HW was updated. |
| self.hw_last_modified = self._get_last_commit_date( |
| [self.hw_path / "rtl"], [""]) |
| |
| # Determine the main contributor of the HW. |
| self.hw_main_contributors = self._get_main_contributor_emails( |
| _OTComponent.HW) |
| if has_started: |
| # Determine last date DIF was updated. |
| self.dif_last_modified = self._get_last_commit_date( |
| [self.dif_path, self.dif_autogen_path], [".h", ".c"]) |
| # Determine the main contributor of the DIF. |
| self.dif_main_contributors = self._get_main_contributor_emails( |
| _OTComponent.DIF) |
| # Determine lifecycle state |
| self.lifecycle_state = self._get_dif_lifecycle_state() |
| # Determine DIF API completeness. |
| self.funcs_unimplemented = self._get_funcs_unimplemented() |
| else: |
| # Set DIF status data to indicate it has not started. |
| self.dif_last_modified = "-" |
| self.dif_main_contributors = [_NOT_STARTED] |
| self.lifecycle_state = "-" |
| self.funcs_unimplemented = [_NOT_STARTED] |
| |
| def _get_dif_lifecycle_state(self): |
| hjson_filename = self.hw_path / f"data/{self.ip}.prj.hjson" |
| with open(hjson_filename, "r") as life_f: |
| lifecycle_data = hjson.load(life_f) |
| # If there are multiple revisions, grab the latest. |
| if "revisions" in lifecycle_data: |
| lifecycle_data = lifecycle_data["revisions"][-1] |
| if "dif_stage" in lifecycle_data: |
| return lifecycle_data["dif_stage"] |
| return "-" |
| |
| def _get_main_contributor_emails(self, component): |
| # Get contributor stats for HW or DIF (SW) and sort by LOC. |
| if component == _OTComponent.DIF: |
| stats = self._get_contributors( |
| [self.dif_path, self.dif_autogen_path], [".h", ".c"]) |
| else: |
| stats = self._get_contributors([self.hw_path / "rtl"], [""]) |
| sorted_stats = sorted(stats.items(), key=lambda x: x[1], reverse=True) |
| # If the second contributor has contributed at least 10% as much as the |
| # first contributor, include both second and first contributors. |
| contributor_1_email, contributor_1_loc = sorted_stats[0] |
| if len(sorted_stats) > 1: |
| contributor_2_email, contributor_2_loc = sorted_stats[1] |
| if (float(contributor_2_loc) / float(contributor_1_loc)) > 0.1: |
| return [contributor_1_email, contributor_2_email] |
| return [contributor_1_email] |
| |
| def _get_contributors(self, file_paths, exts): |
| contributor_stats = collections.defaultdict(int) |
| for file_path, ext in itertools.product(file_paths, exts): |
| full_file_path = file_path.with_suffix(ext) |
| output = io.StringIO() |
| try: |
| # Use gitfame to fetch commit stats, captured from STDOUT. |
| with redirect_stdout(output): |
| gitfame.main(args=[ |
| f"--incl={full_file_path}", "-s", "-e", "--log=ERROR", |
| "--format=json" |
| ]) |
| except FileNotFoundError: |
| logging.error(f"(contributors) file path ({full_file_path}) " |
| "does not exist.") |
| sys.exit(1) |
| gitfame_commit_stats = json.loads(output.getvalue()) |
| for contributor_stat in gitfame_commit_stats["data"]: |
| contributor = contributor_stat[0] |
| loc = contributor_stat[1] |
| if loc == 0: |
| break |
| contributor_stats[contributor] += loc |
| return contributor_stats |
| |
| def _get_last_commit_date(self, file_paths, exts): |
| last_dif_commit_date = None |
| for file_path, ext in itertools.product(file_paths, exts): |
| full_file_path = file_path.with_suffix(ext) |
| try: |
| repo = pydriller.Repository( |
| str(REPO_TOP), filepath=full_file_path).traverse_commits() |
| except FileNotFoundError: |
| logging.error( |
| f"(date) file path ({full_file_path}) does not exist.") |
| sys.exit(1) |
| for commit in repo: |
| if last_dif_commit_date is None: |
| last_dif_commit_date = commit.author_date |
| else: |
| last_dif_commit_date = max(last_dif_commit_date, |
| commit.author_date) |
| return last_dif_commit_date.strftime("%Y-%m-%d %H:%M:%S") |
| |
| def _get_funcs_unimplemented(self): |
| defined_funcs = self._get_defined_funcs() |
| implemented_funcs = self._get_implemented_funcs() |
| self.num_functions_defined = len(defined_funcs) |
| self.num_functions_implemented = len(implemented_funcs) |
| self.api_complete = bool(defined_funcs and |
| defined_funcs == implemented_funcs) |
| if len(defined_funcs) < len(implemented_funcs): |
| logging.warning( |
| f"number of defined functions is less than implemented " |
| f"functions for {self.ip}. Results possibly invalid.") |
| print("Functions missing definitions:") |
| for impl_func in implemented_funcs: |
| if impl_func not in defined_funcs: |
| print(f"\t{impl_func}") |
| return defined_funcs - implemented_funcs |
| |
| def _get_defined_funcs(self): |
| header_file = self.dif_path.with_suffix(".h") |
| autogen_header_file = self.dif_autogen_path.with_suffix(".h") |
| defined_funcs = self._get_funcs(header_file) |
| defined_funcs |= self._get_funcs(autogen_header_file) |
| return defined_funcs |
| |
| def _get_implemented_funcs(self): |
| c_file = self.dif_path.with_suffix(".c") |
| c_autogen_file = self.dif_autogen_path.with_suffix(".c") |
| # The autogenerated header should always exist if the DIF has been |
| # started. |
| implemented_funcs = self._get_funcs(c_autogen_file) |
| # However, the manually-implemented header may not exist yet. |
| # If no .c file exists --> All functions are undefined. |
| if c_file.is_file(): |
| implemented_funcs |= self._get_funcs(c_file) |
| return implemented_funcs |
| |
| def _get_funcs(self, file_path): |
| func_pattern = re.compile(r"dif_result_t (dif_.*)\(.*") |
| funcs = set() |
| with open(file_path, "r") as fp: |
| for line in fp: |
| result = func_pattern.search(line) |
| if result is not None and not line.startswith("static"): |
| funcs.add(result.group(1)) |
| return funcs |
| |
| |
| def print_status_table(dif_statuses: List[DIFStatus], |
| table_format: str) -> None: |
| """Print a table of DIF status information to STDOUT. |
| |
| Args: |
| dif_statuses: List of DIFStatus objects containing metadata about DIF |
| development states. |
| |
| Returns: |
| None |
| """ |
| # Build the table. |
| rows = [] |
| headers = [ |
| "IP", "DIF Updated", "HW Updated", "DIF Contributor*", |
| "HW Contributor*", "Functions\nDefined", "Functions\nImplemented", |
| "Stage" |
| ] |
| for dif_status in dif_statuses: |
| # Color code last modified dates. |
| # Limit the last modified strings to 10 characters to only print the |
| # date (YYYY-MM-DD). |
| hw_last_modified = dif_status.hw_last_modified[:10] |
| dif_last_modified = dif_status.dif_last_modified[:10] |
| if dif_status.hw_last_modified > dif_status.dif_last_modified: |
| hw_last_modified = colored(hw_last_modified, "yellow") |
| dif_last_modified = colored(dif_last_modified, "yellow") |
| # Color code API complete status. |
| if dif_status.api_complete: |
| num_funcs_defined = colored(dif_status.num_functions_defined, |
| "green") |
| num_funcs_implemented = colored( |
| dif_status.num_functions_implemented, "green") |
| else: |
| num_funcs_defined = colored(dif_status.num_functions_defined, |
| "red") |
| num_funcs_implemented = colored( |
| dif_status.num_functions_implemented, "red") |
| |
| # Add row to table (printing one contributor email per line). |
| rows.append([ |
| dif_status.ip, dif_last_modified, hw_last_modified, |
| "\n".join(dif_status.dif_main_contributors), |
| "\n".join(dif_status.hw_main_contributors), num_funcs_defined, |
| num_funcs_implemented, dif_status.lifecycle_state |
| ]) |
| |
| # Print the table and legend. |
| print("DIF Statuses:") |
| print(tabulate(rows, headers, tablefmt=table_format)) |
| print("""*Only the top two contributors (by LOC) """ |
| """for each component are listed.""") |
| print(colored("Yellow", "yellow"), |
| "\t= HW has been updated since the DIF.") |
| print( |
| colored("Green", "green"), |
| """\t= DIF API, as defined in the current header file, is complete. """ |
| """Note, the header file may lack necessary API functionality.""") |
| print(colored("Red", "red"), |
| ("\t= DIF API is incomplete, as defined in the header file or the " |
| "work has not yet begun.")) |
| |
| |
| def print_unimplemented_difs(dif_statuses: List[DIFStatus], |
| table_format: str) -> None: |
| """Print a table of specific functions names DIF functions to STDOUT. |
| |
| Args: |
| dif_statuses: List of DIFStatus objects containing metadata about DIF |
| development states. |
| table_format: Format of output table to print. See tabulate module. |
| |
| Returns: |
| None |
| """ |
| # Build and print table. |
| print("Unimplemented Functions:") |
| rows = [] |
| headers = ["IP", "Function"] |
| for dif_status in dif_statuses: |
| if not dif_status.api_complete: |
| rows.append( |
| [dif_status.ip, "\n".join(dif_status.funcs_unimplemented)]) |
| print(tabulate(rows, headers, tablefmt=table_format)) |
| |
| |
| def main(argv): |
| # Process args and set logging level. |
| # TODO: parallelize data scraping so its much faster |
| parser = argparse.ArgumentParser( |
| prog="check_dif_statuses", |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| parser.add_argument( |
| "--show-unimplemented", |
| action="store_true", |
| help="""Show unimplemented functions for each incomplete DIF.""") |
| parser.add_argument("--table-format", |
| type=str, |
| choices=["grid", "github", "pipe"], |
| default="grid", |
| help="""Format to print status tables in.""") |
| parser.add_argument( |
| "top_hjson", |
| help="""Path to the top-level HJSON configuration file relative to |
| REPO_TOP.""") |
| args = parser.parse_args(argv) |
| logging.basicConfig(level=logging.WARNING) |
| |
| # Make sure to call this script from REPO_TOP. |
| if Path.cwd() != REPO_TOP: |
| logging.error(f"Must call script from \"$REPO_TOP\": {REPO_TOP}") |
| sys.exit(1) |
| |
| # Get the list of IP blocks by invoking the topgen tool. |
| topgen_tool = REPO_TOP / "util/topgen.py" |
| top_hjson = REPO_TOP / args.top_hjson |
| top_level = top_hjson.stem |
| top_hjson_text = top_hjson.read_text() |
| topcfg = hjson.loads(top_hjson_text, use_decimal=True) |
| ipgen_ips = lib.get_ipgen_modules(topcfg) |
| # yapf: disable |
| topgen_process = subprocess.run([topgen_tool, "-t", top_hjson, |
| "--get_blocks", "-o", REPO_TOP], |
| universal_newlines=True, |
| stdout=subprocess.PIPE, |
| check=True) |
| # yapf: enable |
| # All DIF names are prefixed with `dif_`. |
| difs = {f"dif_{dif.strip()}" for dif in topgen_process.stdout.split()} |
| |
| # Get DIF statuses (while displaying a progress bar). |
| dif_statuses = [] |
| progress_bar = enlighten.Counter(total=len(difs), |
| desc="Analyzing statuses of DIFs ...", |
| unit="DIFs") |
| for dif in difs: |
| dif_statuses.append(DIFStatus(ipgen_ips, top_level, dif)) |
| progress_bar.update() |
| dif_statuses.sort(key=lambda x: x.ip) |
| |
| # Build table and print it to STDOUT. |
| print_status_table(dif_statuses, args.table_format) |
| if args.show_unimplemented: |
| print_unimplemented_difs(dif_statuses, args.table_format) |
| |
| |
| if __name__ == "__main__": |
| main(sys.argv[1:]) |