blob: 35ab2f37c2b9d03705c39632515d7d765a9d76e3 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
"""DIF Status Report Generator.
This tool generates a status report for the DIFs by cross referencing the git
commit history of each DIF with that of the HW it actuates to provide OpenTitan
developers with information about what DIFs require updating.
To display usage run:
./check_dif_statuses.py --help
"""
import argparse
import collections
import glob
import io
import json
import logging
import os
import re
import subprocess
import sys
from contextlib import redirect_stdout
from enum import Enum
from pathlib import Path
from typing import List
import enlighten
import gitfame
import hjson
import pydriller
from tabulate import tabulate
from termcolor import colored
# Maintain a list of IPs that only exist in the top-level area.
#
# Note that there are several templated IPs that are auto-generated in the
# top-level area as well, but since the bulk of the code (including the
# template) lives in the hw/ip area, we do not need to consider them.
_TOP_LEVEL_IPS = {"ast", "sensor_ctrl"}
# Indicates that the DIF work has not yet started.
_NOT_STARTED = colored("NOT STARTED", "red")
# This file is $REPO_TOP/util/make_new_dif/ip.py, so it takes two parent()
# calls to get back to the top.
REPO_TOP = Path(__file__).resolve().parent.parent
class _OTComponent(Enum):
"""Type of OpenTitan component."""
DIF = 1
HW = 2
class _DIFFunctionType(Enum):
"""Type of DIF function."""
ALERT = 1
IRQ = 2
UNIMPLEMENTED = 3
class DIFStatus:
"""Holds all DIF status information for displaying.
Attributes:
dif_name (str): Full name of the DIF including the IP name.
ip (str): Name of the IP the DIF is associated with.
dif_path (str): Path to the DIF code.
hw_path (str): Path to the HW RTL associated with this DIF.
dif_last_modified (datetime): Date and time the DIF was last modified.
hw_last_modified (datetime): Date and time the HW was last modified.
dif_main_contributors (List[str]): List of emails of DIF contributors.
hw_main_constributors (List[str]): List of emails of HW contributors.
lifecycle_state (str): Lifecycle state string (e.g., S0, S1, ...).
num_functions_defined (int): Number of API functions defined.
num_functions_implemented (int): Number of API functions implemented.
api_complete (bool): Indicates if DIF implements all defined functions.
funcs_unimplemented (Set[str]): Set of unimplemted DIF functions.
"""
def __init__(self, top_level, difs_root_path, dif_name):
"""Mines metadata to populate this DIFStatus object.
Args:
top_level: Name of the top level design.
difs_root_path: Path to DIF source code from REPO_TOP.
dif_name: Full name of the DIF including the IP name.
Raises:
ValueError: Raised if DIF name does not start with "dif_".
"""
# Get DIF/IP names and path.
if not dif_name.startswith("dif_"):
raise ValueError("DIF name should start with \"dif_\".")
self.dif_name = dif_name
self.ip = self.dif_name[4:]
self.dif_path = os.path.join(difs_root_path, dif_name)
# Check if header file exists - if not then its not even begun.
has_started = os.path.isfile(self.dif_path + ".h")
self.hw_path = (f"hw/{top_level}/ip/{self.ip}"
if self.ip in _TOP_LEVEL_IPS else f"hw/ip/{self.ip}")
# Indicates DIF API completeness.
self.num_functions_defined = -1
self.num_functions_implemented = -1
self.api_complete = False
self.irq_funcs = set()
self.alert_funcs = set()
# Determine last date HW was updated.
self.hw_last_modified = self._get_last_commit_date(
os.path.join(self.hw_path, "rtl"), [""])
# Determine the main contributor of the HW.
self.hw_main_contributors = self._get_main_contributor_emails(
_OTComponent.HW)
if has_started:
# Determine last date DIF was updated.
self.dif_last_modified = self._get_last_commit_date(
self.dif_path, [".h", ".c"])
# Determine the main contributor of the DIF.
self.dif_main_contributors = self._get_main_contributor_emails(
_OTComponent.DIF)
# Determine lifecycle state
self.lifecycle_state = self._get_dif_lifecycle_state()
# Determine DIF API completeness.
self.funcs_unimplemented = self._get_funcs_unimplemented()
else:
self.dif_last_modified = "-"
self.dif_main_contributors = [_NOT_STARTED]
self.lifecycle_state = "-"
self.funcs_unimplemented = [_NOT_STARTED]
def _get_dif_lifecycle_state(self):
hjson_filename = os.path.join(self.hw_path, "data",
self.ip + ".prj.hjson")
with open(hjson_filename, "r") as life_f:
lifecycle_data = hjson.load(life_f)
# If there are multiple revisions, grab the latest.
if "revisions" in lifecycle_data:
lifecycle_data = lifecycle_data["revisions"][-1]
if "dif_stage" in lifecycle_data:
return lifecycle_data["dif_stage"]
return "-"
def _get_main_contributor_emails(self, component):
# Get contributor stats for HW or DIF (SW) and sort by LOC.
if component == _OTComponent.DIF:
stats = self._get_contributors(self.dif_path, [".h", ".c"])
else:
stats = self._get_contributors(os.path.join(self.hw_path, "rtl"),
["/*"])
sorted_stats = sorted(stats.items(), key=lambda x: x[1], reverse=True)
# If the second contributor has contributed at least 10% as much as the
# first contributor, include both second and first contributors.
contributor_1_email, contributor_1_loc = sorted_stats[0]
if len(sorted_stats) > 1:
contributor_2_email, contributor_2_loc = sorted_stats[1]
if (float(contributor_2_loc) / float(contributor_1_loc)) > 0.1:
return [contributor_1_email, contributor_2_email]
return [contributor_1_email]
def _get_contributors(self, file_path, exts):
contributor_stats = collections.defaultdict(int)
for ext in exts:
# Check the file/path exists.
full_file_path = file_path + ext
if os.path.isfile(full_file_path) or (
full_file_path.endswith("*") and
os.path.isdir(full_file_path[:-2])):
# Use gitfame to fetch commit stats, captured from STDOUT.
output = io.StringIO()
with redirect_stdout(output):
gitfame.main(args=[
f"--incl={full_file_path}", "-s", "-e", "--log=ERROR",
"--format=json"
])
gitfame_commit_stats = json.loads(output.getvalue())
for contributor_stat in gitfame_commit_stats["data"]:
contributor = contributor_stat[0]
loc = contributor_stat[1]
if loc == 0:
break
contributor_stats[contributor] += loc
else:
logging.error(
f"""(contributors) file path ({full_file_path}) """
"""does not exist.""")
sys.exit(1)
return contributor_stats
def _get_last_commit_date(self, file_path, exts):
last_dif_commit_date = None
for ext in exts:
# Check the file exists.
full_file_path = file_path + ext
if os.path.isfile(full_file_path) or os.path.isdir(full_file_path):
repo = pydriller.Repository(
str(REPO_TOP), filepath=full_file_path).traverse_commits()
for commit in repo:
if last_dif_commit_date is None:
last_dif_commit_date = commit.author_date
else:
last_dif_commit_date = max(last_dif_commit_date,
commit.author_date)
else:
logging.error(
f"(date) file path ({full_file_path}) does not exist.")
sys.exit(1)
return last_dif_commit_date.strftime("%Y-%m-%d %H:%M:%S")
def _get_funcs_unimplemented(self):
defined_funcs = self._get_defined_funcs()
implemented_funcs = self._get_implemented_funcs()
self.num_functions_defined = len(defined_funcs)
self.num_functions_implemented = len(implemented_funcs)
self.api_complete = bool(defined_funcs and
defined_funcs == implemented_funcs)
return defined_funcs - implemented_funcs
def _get_defined_funcs(self):
header_file = self.dif_path + ".h"
defined_funcs = self._get_funcs(header_file)
self.irq_funcs = self._get_irq_funcs_defined(defined_funcs)
self.alert_funcs = self._get_alert_funcs_defined(defined_funcs)
return defined_funcs
def _get_implemented_funcs(self):
c_file = self.dif_path + ".c"
# If no .c file exists --> All functions are undefined.
if not os.path.isfile(c_file):
return set()
return self._get_funcs(c_file)
def _get_funcs(self, file_path):
func_pattern = re.compile(r"^dif_result_t (dif_.*)\(.*")
funcs = set()
with open(file_path, "r") as fp:
for line in fp:
result = func_pattern.search(line)
if result is not None:
funcs.add(result.group(1))
return funcs
def _get_expected_irqs(self):
# TODO: parse HJSON to get expected IRQ info per IP
logging.error(f"{self._get_expected_irqs.__name__}() unimplemented.")
sys.exit(1)
def _get_expected_alerts(self):
# TODO: parse HJSON to get expected Alert info per IP
logging.error(f"{self._get_expected_irqs.__name__}() unimplemented.")
sys.exit(1)
def _get_irq_funcs_defined(self, defined_funcs):
assert defined_funcs and "Expected defined_funcs to be non-empty."
irq_funcs = set()
for func in defined_funcs:
if "irq" in func:
irq_funcs.add(func)
return irq_funcs
def _get_alert_funcs_defined(self, defined_funcs):
assert defined_funcs and "Expected defined_funcs to be non-empty."
alert_funcs = set()
for func in defined_funcs:
if "alert" in func:
alert_funcs.add(func)
return alert_funcs
def get_list_of_difs(difs_root_path: str, shared_headers: List[str]) -> None:
"""Get a list of the root filenames of the DIFs.
Args:
difs_root_path: Root path where DIF source files are located.
shared_headers: Header file(s) shared amongst DIFs.
Returns:
None
"""
dif_headers = list(glob.glob(os.path.join(difs_root_path, "*.h")))
dif_headers = map(os.path.basename, dif_headers)
difs = set(map(lambda s: s.split(".")[0], dif_headers))
for header in shared_headers:
if header in difs:
difs.remove(header)
return difs
def print_status_table(dif_statuses: List[DIFStatus],
table_format: str) -> None:
"""Print a table of DIF status information to STDOUT.
Args:
dif_statuses: List of DIFStatus objects containing metadata about DIF
development states.
Returns:
None
"""
# Build the table.
rows = []
headers = [
"IP", "DIF Updated", "HW Updated", "DIF Contributor*",
"HW Contributor*", "Functions\nDefined", "Functions\nImplemented",
"Stage"
]
for dif_status in dif_statuses:
# Color code last modified dates.
# Limit the last modified strings to 10 characters to only print the
# date (YYYY-MM-DD).
hw_last_modified = dif_status.hw_last_modified[:10]
dif_last_modified = dif_status.dif_last_modified[:10]
if dif_status.hw_last_modified > dif_status.dif_last_modified:
hw_last_modified = colored(hw_last_modified, "yellow")
dif_last_modified = colored(dif_last_modified, "yellow")
# Color code API complete status.
if dif_status.api_complete:
num_funcs_defined = colored(dif_status.num_functions_defined,
"green")
num_funcs_implemented = colored(
dif_status.num_functions_implemented, "green")
else:
num_funcs_defined = colored(dif_status.num_functions_defined,
"red")
num_funcs_implemented = colored(
dif_status.num_functions_implemented, "red")
# Add row to table (printing one contributor email per line).
rows.append([
dif_status.ip, dif_last_modified, hw_last_modified,
"\n".join(dif_status.dif_main_contributors),
"\n".join(dif_status.hw_main_contributors), num_funcs_defined,
num_funcs_implemented, dif_status.lifecycle_state
])
# Print the table and legend.
print("DIF Statuses:")
print(tabulate(rows, headers, tablefmt=table_format))
print("""*Only the top two contributors (by LOC) """
"""for each component are listed.""")
print(colored("Yellow", "yellow"),
"\t= HW has been updated since the DIF.")
print(
colored("Green", "green"),
"""\t= DIF API, as defined in the current header file, is complete. """
"""Note, the header file may lack necessary API functionality.""")
print(colored("Red", "red"),
("\t= DIF API is incomplete, as defined in the header file or the "
"work has not yet begun."))
def print_function_set(dif_statuses: List[DIFStatus],
dif_function_type: _DIFFunctionType,
table_format: str) -> None:
"""Print a table of specific functions names DIF functions to STDOUT.
Args:
dif_statuses: List of DIFStatus objects containing metadata about DIF
development states.
dif_function_type: DIFs to display in {ALERT, IRQ, UNIMPLEMENTED}
table_format: Format of output table to print. See tabulate module.
Returns:
None
"""
# Print label of function type.
if dif_function_type == _DIFFunctionType.ALERT:
print("Alert Functions:")
elif dif_function_type == _DIFFunctionType.IRQ:
print("IRQ Functions:")
elif dif_function_type == _DIFFunctionType.UNIMPLEMENTED:
print("Unimplemented Functions:")
else:
logging.error("Invalid function type to print table.")
sys.exit(1)
# Build and print table.
rows = []
headers = ["IP", "Function"]
for dif_status in dif_statuses:
if dif_function_type == _DIFFunctionType.ALERT:
if dif_status.alert_funcs:
rows.append([dif_status.ip, "\n".join(dif_status.alert_funcs)])
elif dif_function_type == _DIFFunctionType.IRQ:
if dif_status.irq_funcs:
rows.append([dif_status.ip, "\n".join(dif_status.irq_funcs)])
elif dif_function_type == _DIFFunctionType.UNIMPLEMENTED:
if not dif_status.api_complete:
rows.append(
[dif_status.ip, "\n".join(dif_status.funcs_unimplemented)])
else:
# Unreachable.
logging.error("Invalid function type to print table.")
sys.exit(1)
print(tabulate(rows, headers, tablefmt=table_format))
def main(argv):
# Process args and set logging level.
# TODO: parallelize data scraping so its much faster
parser = argparse.ArgumentParser(
prog="check_dif_statuses",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
"--top-hjson",
help="""Path to the top-level HJSON configuration file relative to
REPO_TOP.""")
parser.add_argument(
"--show-unimplemented",
action="store_true",
help="""Show unimplemented functions for each incomplete DIF.""")
parser.add_argument("--show-alerts",
action="store_true",
help="""Show alert functions for each DIF.""")
parser.add_argument("--show-irqs",
action="store_true",
help="""Show IRQ functions for each DIF.""")
parser.add_argument("--table-format",
type=str,
choices=["grid", "github", "pipe"],
default="grid",
help="""Format to print status tables in.""")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.WARNING)
# Define root path of DIFs.
difs_root_path = os.path.join("sw", "device", "lib", "dif")
if args.top_hjson:
# Get the list of IP blocks by invoking the topgen tool.
topgen_tool = os.path.join(REPO_TOP, "util", "topgen.py")
top_hjson = os.path.join(REPO_TOP, args.top_hjson)
top_level = Path(top_hjson).stem
# yapf: disable
topgen_process = subprocess.run([topgen_tool, "-t", top_hjson,
"--get_blocks", "-o", REPO_TOP],
text=True,
universal_newlines=True,
stdout=subprocess.PIPE,
check=True)
# yapf: enable
# All DIF names are prefixed with `dif_`.
difs = {f"dif_{dif.strip()}" for dif in topgen_process.stdout.split()}
else:
# Get list of all DIF basenames.
# TODO: automatically get the list below by cross referencing DIF names
# with IP block names. Hardcoded for now.
print("WARNING: It is recommended to pass the --top-hjson switch to "
"get a more accurate representation of the DIF progress. The "
"list of IPs for which no DIF sources exist is unknown.")
shared_headers = ["dif_base"]
top_level = "top_earlgrey"
difs = get_list_of_difs(difs_root_path, shared_headers)
# Get DIF statuses (while displaying a progress bar).
dif_statuses = []
progress_bar = enlighten.Counter(total=len(difs),
desc="Analyzing statuses of DIFs ...",
unit="DIFs")
for dif in difs:
dif_statuses.append(DIFStatus(top_level, difs_root_path, dif))
progress_bar.update()
# Build table and print it to STDOUT.
print_status_table(dif_statuses, args.table_format)
if args.show_unimplemented:
print_function_set(dif_statuses, _DIFFunctionType.UNIMPLEMENTED,
args.table_format)
if args.show_alerts:
print_function_set(dif_statuses, _DIFFunctionType.ALERT,
args.table_format)
if args.show_irqs:
print_function_set(dif_statuses, _DIFFunctionType.IRQ,
args.table_format)
if __name__ == "__main__":
main(sys.argv[1:])