[util] Added Python script to generate a status report of the DIFs.
Signed-off-by: Timothy Trippel <ttrippel@google.com>
diff --git a/util/check_dif_statuses.py b/util/check_dif_statuses.py
new file mode 100644
index 0000000..bc9e6ca
--- /dev/null
+++ b/util/check_dif_statuses.py
@@ -0,0 +1,364 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+"""DIF Status Report Generator.
+
+This tool generates a status report for the DIFs by cross referencing the git
+commit history of each DIF with that of the HW it actuates to provide OpenTitan
+developers with information about what DIFs require updating.
+
+To display usage run:
+ ./check_dif_statuses.py --help
+"""
+
+import argparse
+import collections
+import glob
+import io
+import json
+import logging
+import os
+import re
+import sys
+from contextlib import redirect_stdout
+from enum import Enum
+from typing import List
+
+import enlighten
+import gitfame
+import hjson
+import pydriller
+from tabulate import tabulate
+from termcolor import colored
+
+# The IP suffix of some DIFs are slightly different than the IP directory name
+# in the hw/ip/ directory, so here is the translation for the applicable IPs.
+IP_TRANSLATION = {
+ "plic": "rv_plic",
+ "entropy": "entropy_src",
+}
+
+
+class _OTComponent(Enum):
+ """Type of OpenTitan component."""
+ DIF = 1
+ HW = 2
+
+
+class DIFStatus:
+ """Holds all DIF status information for displaying.
+
+ Attributes:
+ dif_name (str): Full name of the DIF including the IP name.
+ ip (str): Name of the IP the DIF is associated with.
+ dif_path (str): Path to the DIF code.
+ hw_path (str): Path to the HW RTL associated with this DIF.
+ dif_last_modified (datetime): Date and time the DIF was last modified.
+ hw_last_modified (datetime): Date and time the HW was last modified.
+ dif_main_contributors (List[str]): List of emails of DIF contributors.
+ hw_main_constributors (List[str]): List of emails of HW contributors.
+ lifecycle_state (str): Lifecycle state string (e.g., S0, S1, ...).
+ num_functions_defined (int): Number of API functions defined.
+ num_functions_implemented (int): Number of API functions implemented.
+ api_complete (bool): Indicates if DIF implements all defined functions.
+ funcs_unimplemented (Set[str]): Set of unimplemted DIF functions.
+
+ """
+ def __init__(self, repo_top, difs_root_path, dif_name):
+ """Mines metadata to populate this DIFStatus object.
+
+ Args:
+ repo_top: Relative path of local OpenTitan repository.
+ difs_root_path: Path to DIF source code from repo_top.
+ dif_name: Full name of the DIF including the IP name.
+
+ Raises:
+ ValueError: Raised if DIF name does not start with "dif_".
+ """
+ # Get DIF/IP names and path.
+ if not dif_name.startswith("dif_"):
+ raise ValueError("DIF name should start with \"dif_\".")
+ self.dif_name = dif_name
+ self.ip = self.dif_name[4:]
+ if self.ip in IP_TRANSLATION:
+ self.ip = IP_TRANSLATION[self.ip]
+ self.dif_path = os.path.join(difs_root_path, dif_name)
+ self.hw_path = f"hw/ip/{self.ip}"
+ # Determine last date DIF and HW was updated.
+ self.dif_last_modified = self._get_last_commit_date(
+ repo_top, self.dif_path, [".h", ".c"])
+ self.hw_last_modified = self._get_last_commit_date(
+ repo_top, os.path.join(self.hw_path, "rtl"), [""])
+ # Determine the main contributor of the DIF and HW.
+ self.dif_main_contributors = self._get_main_contributor_emails(
+ _OTComponent.DIF)
+ self.hw_main_contributors = self._get_main_contributor_emails(
+ _OTComponent.HW)
+ # Determine lifecycle state
+ self.lifecycle_state = self._get_dif_lifecycle_state()
+ # Determine DIF API completeness.
+ self.num_functions_defined = -1
+ self.num_functions_implemented = -1
+ self.api_complete = False
+ self.funcs_unimplemented = self._get_funcs_unimplemented()
+
+ def _get_dif_lifecycle_state(self):
+ hjson_filename = os.path.join(self.hw_path, "data",
+ self.ip + ".prj.hjson")
+ with open(hjson_filename, "r") as life_f:
+ lifecycle_data = hjson.load(life_f)
+ # If there are multiple revisions, grab the latest.
+ if "revisions" in lifecycle_data:
+ lifecycle_data = lifecycle_data["revisions"][-1]
+ if "dif_stage" in lifecycle_data:
+ return lifecycle_data["dif_stage"]
+ return "-"
+
+ def _get_main_contributor_emails(self, component):
+ # Get contributor stats for HW or DIF (SW) and sort by LOC.
+ if component == _OTComponent.DIF:
+ stats = self._get_contributors(self.dif_path, [".h", ".c"])
+ else:
+ stats = self._get_contributors(os.path.join(self.hw_path, "rtl"),
+ ["/*"])
+ sorted_stats = sorted(stats.items(), key=lambda x: x[1], reverse=True)
+ # If the second contributor has contributed at least 10% as much as the
+ # first contributor, include both second and first contributors.
+ contributor_1_email, contributor_1_loc = sorted_stats[0]
+ if len(sorted_stats) > 1:
+ contributor_2_email, contributor_2_loc = sorted_stats[1]
+ if (float(contributor_2_loc) / float(contributor_1_loc)) > 0.1:
+ return [contributor_1_email, contributor_2_email]
+ return [contributor_1_email]
+
+ def _get_contributors(self, file_path, exts):
+ contributor_stats = collections.defaultdict(int)
+ for ext in exts:
+ # Check the file/path exists.
+ full_file_path = file_path + ext
+ if os.path.isfile(full_file_path) or (
+ full_file_path.endswith("*") and
+ os.path.isdir(full_file_path[:-2])):
+ # Use gitfame to fetch commit stats, captured from STDOUT.
+ output = io.StringIO()
+ with redirect_stdout(output):
+ gitfame.main(args=[
+ f"--incl={full_file_path}", "-s", "-e", "--log=ERROR",
+ "--format=json"
+ ])
+ gitfame_commit_stats = json.loads(output.getvalue())
+ for contributor_stat in gitfame_commit_stats["data"]:
+ contributor = contributor_stat[0]
+ loc = contributor_stat[1]
+ if loc == 0:
+ break
+ contributor_stats[contributor] += loc
+ else:
+ logging.error(
+ f"""(contributors) file path ({full_file_path}) """
+ """does not exist.""")
+ sys.exit(1)
+ return contributor_stats
+
+ def _get_last_commit_date(self, repo_top, file_path, exts):
+ last_dif_commit_date = None
+ for ext in exts:
+ # Check the file exists.
+ full_file_path = file_path + ext
+ if os.path.isfile(full_file_path) or os.path.isdir(full_file_path):
+ repo = pydriller.Repository(
+ repo_top, filepath=full_file_path).traverse_commits()
+ for commit in repo:
+ if last_dif_commit_date is None:
+ last_dif_commit_date = commit.author_date
+ else:
+ last_dif_commit_date = max(last_dif_commit_date,
+ commit.author_date)
+ else:
+ logging.error(
+ f"(date) file path ({full_file_path}) does not exist.")
+ sys.exit(1)
+ return last_dif_commit_date
+
+ def _get_funcs_unimplemented(self):
+ defined_funcs = self._get_defined_funcs()
+ implemented_funcs = self._get_implemented_funcs()
+ self.num_functions_defined = len(defined_funcs)
+ self.num_functions_implemented = len(implemented_funcs)
+ self.api_complete = bool(defined_funcs and
+ defined_funcs == implemented_funcs)
+ return defined_funcs - implemented_funcs
+
+ def _get_defined_funcs(self):
+ header_file = self.dif_path + ".h"
+ return self._get_funcs(header_file)
+
+ def _get_implemented_funcs(self):
+ c_file = self.dif_path + ".c"
+ # If no .c file exists --> All functions are undefined.
+ if not os.path.isfile(c_file):
+ return set()
+ return self._get_funcs(c_file)
+
+ def _get_funcs(self, file_path):
+ func_pattern = re.compile(r"^dif_.*_result_t (dif_.*)\(.*")
+ funcs = set()
+ with open(file_path, "r") as fp:
+ for line in fp:
+ result = func_pattern.search(line)
+ if result is not None:
+ funcs.add(result.group(1))
+ return funcs
+
+
+def get_list_of_difs(difs_root_path: str, shared_headers: List[str]) -> None:
+ """Get a list of the root filenames of the DIFs.
+
+ Args:
+ difs_root_path: Root path where DIF source files are located.
+ shared_headers: Header file(s) shared amongst DIFs.
+
+ Returns:
+ None
+ """
+ dif_headers = list(glob.glob(os.path.join(difs_root_path, "*.h")))
+ dif_headers = map(os.path.basename, dif_headers)
+ difs = set(map(lambda s: s.split(".")[0], dif_headers))
+ for header in shared_headers:
+ if header in difs:
+ difs.remove(header)
+ return difs
+
+
+def print_status_table(dif_statuses: List[DIFStatus],
+ table_format: str) -> None:
+ """Print a table of DIF status information to STDOUT.
+
+ Args:
+ dif_statuses: List of DIFStatus objects containing metadata about DIF
+ development states.
+
+ Returns:
+ None
+ """
+ # Build the table.
+ rows = []
+ headers = [
+ "IP", "DIF Updated", "HW Updated", "DIF Contributor*",
+ "HW Contributor*", "Functions\nDefined", "Functions\nImplemented",
+ "Stage"
+ ]
+ for dif_status in dif_statuses:
+ # Color code last modified dates.
+ if dif_status.hw_last_modified > dif_status.dif_last_modified:
+ hw_last_modified = colored(dif_status.hw_last_modified.date(),
+ "yellow")
+ dif_last_modified = colored(dif_status.dif_last_modified.date(),
+ "yellow")
+ else:
+ hw_last_modified = dif_status.hw_last_modified.date()
+ dif_last_modified = dif_status.dif_last_modified.date()
+ # Color code API complete status.
+ if dif_status.api_complete:
+ num_funcs_defined = colored(dif_status.num_functions_defined,
+ "green")
+ num_funcs_implemented = colored(
+ dif_status.num_functions_implemented, "green")
+ else:
+ num_funcs_defined = colored(dif_status.num_functions_defined,
+ "red")
+ num_funcs_implemented = colored(
+ dif_status.num_functions_implemented, "red")
+
+ # Add row to table (printing one contributor email per line).
+ rows.append([
+ dif_status.ip, dif_last_modified, hw_last_modified,
+ "\n".join(dif_status.dif_main_contributors),
+ "\n".join(dif_status.hw_main_contributors), num_funcs_defined,
+ num_funcs_implemented, dif_status.lifecycle_state
+ ])
+
+ # Print the table and legend.
+ print("DIF Statuses:")
+ print(tabulate(rows, headers, tablefmt=table_format))
+ print("""*Only the top two contributors (by LOC) """
+ """for each component are listed.""")
+ print(colored("Yellow", "yellow"),
+ "\t= HW has been updated since the DIF.")
+ print(
+ colored("Green", "green"),
+ """\t= DIF API, as defined in the current header file, is complete. """
+ """Note, the header file may lack necessary API functionality.""")
+ print(colored("Red", "red"),
+ "\t= DIF API is incomplete, as defined in the header file.")
+
+
+def print_unimplemented_functions(dif_statuses: List[DIFStatus],
+ table_format: str) -> None:
+ """Print a table of unimplemented DIF functions to STDOUT.
+
+ Args:
+ dif_statuses: List of DIFStatus objects containing metadata about DIF
+ development states.
+
+ Returns:
+ None
+ """
+ rows = []
+ headers = ["IP", "Function"]
+ for dif_status in dif_statuses:
+ if not dif_status.api_complete:
+ rows.append(
+ [dif_status.ip, "\n".join(dif_status.funcs_unimplemented)])
+ print("Unimplemented Functions:")
+ print(tabulate(rows, headers, tablefmt=table_format))
+
+
+def main(argv):
+ # Process args.
+ parser = argparse.ArgumentParser(
+ prog="check_dif_statuses",
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument(
+ "repo_top",
+ help=
+ """Relative path to where the OpenTitan repository is checked out.""")
+ parser.add_argument(
+ "--show-unimplemented",
+ action="store_true",
+ help="""Show unimplemented functions for each incomplete DIF.""")
+ parser.add_argument("--table-format",
+ type=str,
+ choices=["grid", "github", "pipe"],
+ default="grid",
+ help="""Format to print status tables in.""")
+ args = parser.parse_args(argv)
+
+ # Define root path of DIFs.
+ difs_root_path = os.path.join("sw", "device", "lib", "dif")
+
+ # Get list of all DIF basenames.
+ # TODO: automatically get the list below by cross referencing DIF names
+ # with IP block names. Hardcoded for now.
+ shared_headers = ["dif_warn_unused_result"]
+ difs = get_list_of_difs(difs_root_path, shared_headers)
+
+ # Get DIF statuses (while displaying a progress bar).
+ dif_statuses = []
+ progress_bar = enlighten.Counter(total=len(difs),
+ desc="Analyzing statuses of DIFs ...",
+ unit="DIFs")
+ for dif in difs:
+ dif_statuses.append(DIFStatus(args.repo_top, difs_root_path, dif))
+ progress_bar.update()
+
+ # Build table and print it to STDOUT.
+ print_status_table(dif_statuses, args.table_format)
+ if args.show_unimplemented:
+ print_unimplemented_functions(dif_statuses, args.table_format)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])