blob: d0a5494f30795eb6280375c3de77b862890eae75 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import datetime
import json
import logging as log
import os
import pprint
import re
import subprocess
import sys
from pathlib import Path
from shutil import which
import hjson
from CfgJson import set_target_attribute
from LauncherFactory import get_launcher_cls
from Scheduler import Scheduler
from utils import (VERBOSE, clean_odirs, find_and_substitute_wildcards,
md_results_to_html, mk_path, rm_path, subst_wildcards)
# Interface class for extensions.
class FlowCfg():
'''Base class for the different flows supported by dvsim.py
The constructor expects some parsed hjson data. Create these objects with
the factory function in CfgFactory.py, which loads the hjson data and picks
a subclass of FlowCfg based on its contents.
'''
# Set in subclasses. This is the key that must be used in an hjson file to
# tell dvsim.py which subclass to use.
flow = None
# Can be overridden in subclasses to configure which wildcards to ignore
# when expanding hjson.
ignored_wildcards = []
def __str__(self):
return pprint.pformat(self.__dict__)
def __init__(self, flow_cfg_file, hjson_data, args, mk_config):
# Options set from command line
# Uniquify input items, while preserving the order.
self.items = list(dict.fromkeys(args.items))
self.list_items = args.list
self.select_cfgs = args.select_cfgs
self.flow_cfg_file = flow_cfg_file
self.args = args
self.scratch_root = args.scratch_root
self.branch = args.branch
self.job_prefix = args.job_prefix
self.gui = args.gui
self.interactive = args.interactive
# Options set from hjson cfg.
self.project = ""
self.scratch_path = ""
self.scratch_base_path = ""
# Add exports using 'exports' keyword - these are exported to the child
# process' environment.
self.exports = []
# Add overrides using the overrides keyword - existing attributes
# are overridden with the override values.
self.overrides = []
# List of cfgs if the parsed cfg is a primary cfg list
self.cfgs = []
# Add a notion of "primary" cfg - this is indicated using
# a special key 'use_cfgs' within the hjson cfg.
self.is_primary_cfg = False
# For a primary cfg, it is the aggregated list of all deploy objects
# under self.cfgs. For a non-primary cfg, it is the list of items
# slated for dispatch.
self.deploy = []
# Timestamp
self.timestamp_long = args.timestamp_long
self.timestamp = args.timestamp
# Results
self.errors_seen = False
self.rel_path = ""
self.results_title = ""
self.revision = ""
self.results_server_prefix = ""
self.results_server_cmd = ""
self.css_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "style.css")
# `self.results_*` below will be updated after `self.rel_path` and
# `self.scratch_base_root` variables are updated.
self.results_dir = ""
self.results_page = ""
self.results_server_path = ""
self.results_server_dir = ""
self.results_server_page = ""
self.results_server_url = ""
self.results_html_name = ""
# Full results in md text
self.results_md = ""
# Selectively sanitized md results to be published
self.publish_results_md = ""
self.sanitize_publish_results = False
# Summary results, generated by over-arching cfg
self.results_summary_md = ""
# Merge in the values from the loaded hjson file. If subclasses want to
# add other default parameters that depend on the parameters above,
# they can override _merge_hjson and add their parameters at the start
# of that.
self._merge_hjson(hjson_data)
# Is this a primary config? If so, we need to load up all the child
# configurations at this point. If not, we place ourselves into
# self.cfgs and consider ourselves a sort of "degenerate primary
# configuration".
self.is_primary_cfg = 'use_cfgs' in hjson_data
if not self.is_primary_cfg:
self.cfgs.append(self)
else:
for entry in self.use_cfgs:
self._load_child_cfg(entry, mk_config)
if self.rel_path == "":
self.rel_path = os.path.dirname(self.flow_cfg_file).replace(
self.proj_root + '/', '')
# Process overrides before substituting wildcards
self._process_overrides()
# Expand wildcards. If subclasses need to mess around with parameters
# after merging the hjson but before expansion, they can override
# _expand and add the code at the start.
self._expand()
# Construct the path variables after variable expansion.
self.results_dir = (Path(self.scratch_base_path) / "reports" /
self.rel_path / "latest")
self.results_page = (self.results_dir / self.results_html_name)
tmp_path = self.results_server + "/" + self.rel_path
self.results_server_path = self.results_server_prefix + tmp_path
tmp_path += "/latest"
self.results_server_dir = self.results_server_prefix + tmp_path
tmp_path += "/" + self.results_html_name
self.results_server_page = self.results_server_prefix + tmp_path
self.results_server_url = "https://" + tmp_path
# Run any final checks
self._post_init()
def _merge_hjson(self, hjson_data):
'''Take hjson data and merge it into self.__dict__
Subclasses that need to do something just before the merge should
override this method and call super()._merge_hjson(..) at the end.
'''
for key, value in hjson_data.items():
set_target_attribute(self.flow_cfg_file, self.__dict__, key, value)
def _expand(self):
'''Called to expand wildcards after merging hjson
Subclasses can override this to do something just before expansion.
'''
# If this is a primary configuration, it doesn't matter if we don't
# manage to expand everything.
partial = self.is_primary_cfg
self.__dict__ = find_and_substitute_wildcards(self.__dict__,
self.__dict__,
self.ignored_wildcards,
ignore_error=partial)
def _post_init(self):
# Run some post init checks
if not self.is_primary_cfg:
# Check if self.cfgs is a list of exactly 1 item (self)
if not (len(self.cfgs) == 1 and self.cfgs[0].name == self.name):
log.error("Parse error!\n%s", self.cfgs)
sys.exit(1)
def create_instance(self, mk_config, flow_cfg_file):
'''Create a new instance of this class for the given config file.
mk_config is a factory method (passed explicitly to avoid a circular
dependency between this file and CfgFactory.py).
'''
new_instance = mk_config(flow_cfg_file)
# Sanity check to make sure the new object is the same class as us: we
# don't yet support heterogeneous primary configurations.
if type(self) is not type(new_instance):
log.error("{}: Loading child configuration at {!r}, but the "
"resulting flow types don't match: ({} vs. {}).".format(
self.flow_cfg_file, flow_cfg_file,
type(self).__name__,
type(new_instance).__name__))
sys.exit(1)
return new_instance
def _load_child_cfg(self, entry, mk_config):
'''Load a child configuration for a primary cfg'''
if type(entry) is str:
# Treat this as a file entry. Substitute wildcards in cfg_file
# files since we need to process them right away.
cfg_file = subst_wildcards(entry, self.__dict__, ignore_error=True)
self.cfgs.append(self.create_instance(mk_config, cfg_file))
elif type(entry) is dict:
# Treat this as a cfg expanded in-line
temp_cfg_file = self._conv_inline_cfg_to_hjson(entry)
if not temp_cfg_file:
return
self.cfgs.append(self.create_instance(mk_config, temp_cfg_file))
# Delete the temp_cfg_file once the instance is created
log.log(VERBOSE, "Deleting temp cfg file:\n%s", temp_cfg_file)
rm_path(temp_cfg_file, ignore_error=True)
else:
log.error(
"Type of entry \"%s\" in the \"use_cfgs\" key is invalid: %s",
entry, str(type(entry)))
sys.exit(1)
def _conv_inline_cfg_to_hjson(self, idict):
'''Dump a temp hjson file in the scratch space from input dict.
This method is to be called only by a primary cfg'''
if not self.is_primary_cfg:
log.fatal("This method can only be called by a primary cfg")
sys.exit(1)
name = idict["name"] if "name" in idict.keys() else None
if not name:
log.error("In-line entry in use_cfgs list does not contain a "
"\"name\" key (will be skipped!):\n%s", idict)
return None
# Check if temp cfg file already exists
temp_cfg_file = (self.scratch_root + "/." + self.branch + "__" + name +
"_cfg.hjson")
# Create the file and dump the dict as hjson
log.log(VERBOSE, "Dumping inline cfg \"%s\" in hjson to:\n%s", name,
temp_cfg_file)
try:
with open(temp_cfg_file, "w") as f:
f.write(hjson.dumps(idict, for_json=True))
except Exception as e:
log.error("Failed to hjson-dump temp cfg file\"%s\" for \"%s\""
"(will be skipped!) due to:\n%s", temp_cfg_file, name, e)
return None
# Return the temp cfg file created
return temp_cfg_file
def _process_overrides(self):
# Look through the dict and find available overrides.
# If override is available, check if the type of the value for existing
# and the overridden keys are the same.
overrides_dict = {}
if hasattr(self, "overrides"):
overrides = getattr(self, "overrides")
if type(overrides) is not list:
log.error("The type of key \"overrides\" is %s - it should be "
"a list", type(overrides))
sys.exit(1)
# Process override one by one
for item in overrides:
if type(item) is dict and set(
item.keys()) == {"name", "value"}:
ov_name = item["name"]
ov_value = item["value"]
if ov_name not in overrides_dict.keys():
overrides_dict[ov_name] = ov_value
self._do_override(ov_name, ov_value)
else:
log.error("Override for key \"%s\" already exists!\n"
"Old: %s\nNew: %s", ov_name,
overrides_dict[ov_name], ov_value)
sys.exit(1)
else:
log.error("\"overrides\" is a list of dicts with "
"{\"name\": <name>, \"value\": <value>} pairs. "
"Found this instead:\n%s", str(item))
sys.exit(1)
def _do_override(self, ov_name, ov_value):
# Go through self attributes and replace with overrides
if hasattr(self, ov_name):
orig_value = getattr(self, ov_name)
if type(orig_value) == type(ov_value):
log.debug("Overriding \"%s\" value \"%s\" with \"%s\"",
ov_name, orig_value, ov_value)
setattr(self, ov_name, ov_value)
else:
log.error("The type of override value \"%s\" for \"%s\" "
"mismatches the type of original value \"%s\"",
ov_value, ov_name, orig_value)
sys.exit(1)
else:
log.error("Override key \"%s\" not found in the cfg!", ov_name)
sys.exit(1)
def _purge(self):
'''Purge the existing scratch areas in preperation for the new run.'''
return
def purge(self):
'''Public facing API for _purge().'''
for item in self.cfgs:
item._purge()
def _print_list(self):
'''Print the list of available items that can be kicked off.'''
return
def print_list(self):
'''Public facing API for _print_list().'''
for item in self.cfgs:
item._print_list()
def prune_selected_cfgs(self):
'''Prune the list of configs for a primary config file.'''
# This should run after self.cfgs has been set
assert self.cfgs
# If the user didn't pass --select-cfgs, we don't do anything.
if self.select_cfgs is None:
return
# If the user passed --select-cfgs, but this isn't a primary config
# file, we should probably complain.
if not self.is_primary_cfg:
log.error('The configuration file at {!r} is not a primary '
'config, but --select-cfgs was passed on the command '
'line.'.format(self.flow_cfg_file))
sys.exit(1)
# Filter configurations
self.cfgs = [c for c in self.cfgs if c.name in self.select_cfgs]
def _create_deploy_objects(self):
'''Create deploy objects from items that were passed on for being run.
The deploy objects for build and run are created from the objects that
were created from the create_objects() method.
'''
return
def create_deploy_objects(self):
'''Public facing API for _create_deploy_objects().
'''
self.prune_selected_cfgs()
# GUI or Interactive mode is allowed only for one cfg.
if (self.gui or self.interactive) and len(self.cfgs) > 1:
log.fatal("In GUI mode, only one cfg can be run.")
sys.exit(1)
for item in self.cfgs:
item._create_deploy_objects()
def deploy_objects(self):
'''Public facing API for deploying all available objects.
Runs each job and returns a map from item to status.
'''
deploy = []
for item in self.cfgs:
deploy.extend(item.deploy)
if not deploy:
log.error("Nothing to run!")
sys.exit(1)
return Scheduler(deploy, get_launcher_cls(), self.interactive).run()
def _gen_results(self, results):
'''
The function is called after the flow has completed. It collates
the status of all run targets and generates a dict. It parses the log
to identify the errors, warnings and failures as applicable. It also
prints the full list of failures for debug / triage to the final
report, which is in markdown format.
results should be a dictionary mapping deployed item to result.
'''
return
def gen_results(self, results):
'''Public facing API for _gen_results().
results should be a dictionary mapping deployed item to result.
'''
for item in self.cfgs:
json_str = (item._gen_json_results(results)
if hasattr(item, '_gen_json_results')
else None)
result = item._gen_results(results)
log.info("[results]: [%s]:\n%s\n", item.name, result)
log.info("[scratch_path]: [%s] [%s]", item.name, item.scratch_path)
item.write_results(self.results_html_name, item.results_md,
json_str)
log.log(VERBOSE, "[report]: [%s] [%s/report.html]", item.name,
item.results_dir)
self.errors_seen |= item.errors_seen
if self.is_primary_cfg:
self.gen_results_summary()
self.write_results(self.results_html_name,
self.results_summary_md)
def gen_results_summary(self):
'''Public facing API to generate summary results for each IP/cfg file
'''
return
def write_results(self, html_filename, text_md, json_str=None):
"""Write results to files.
This function converts text_md to HTML and writes the result to a file
in self.results_dir with the file name given by html_filename. If
json_str is not None, this function additionally writes json_str to a
file with the same path and base name as the HTML file but with '.json'
as suffix.
"""
# Prepare reports directory, keeping 90 day history.
clean_odirs(odir=self.results_dir, max_odirs=89)
mk_path(self.results_dir)
# Write results to the report area.
with open(self.results_dir / html_filename, "w") as f:
f.write(
md_results_to_html(self.results_title, self.css_file, text_md))
if json_str is not None:
filename = Path(html_filename).with_suffix('.json')
with open(self.results_dir / filename, "w") as f:
f.write(json_str)
def _get_results_page_link(self, relative_to, link_text=''):
"""Create a relative markdown link to the results page."""
link_text = self.name.upper() if not link_text else link_text
relative_link = os.path.relpath(self.results_page,
relative_to)
return "[%s](%s)" % (link_text, relative_link)
def _publish_results(self):
'''Publish results to the opentitan web server.
Results are uploaded to {results_server_page}.
If the 'latest' directory exists, then it is renamed to its 'timestamp'
directory. If the list of directories in this area is > 14, then the
oldest entry is removed. Links to the last 7 regression results are
appended at the end if the results page.
'''
if which('gsutil') is None or which('gcloud') is None:
log.error("Google cloud SDK not installed! Cannot access the "
"results server")
return
# Timeformat for moving the dir
tf = "%Y.%m.%d_%H.%M.%S"
# Extract the timestamp of the existing self.results_server_page
cmd = (self.results_server_cmd + " ls -L " +
self.results_server_page + " | grep \'Creation time:\'")
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
old_results_ts = cmd_output.stdout.decode("utf-8")
old_results_ts = old_results_ts.replace("Creation time:", "")
old_results_ts = old_results_ts.strip()
# Move the 'latest' to its timestamp directory if lookup succeeded
if cmd_output.returncode == 0:
try:
if old_results_ts != "":
ts = datetime.datetime.strptime(
old_results_ts, "%a, %d %b %Y %H:%M:%S %Z")
old_results_ts = ts.strftime(tf)
except ValueError as e:
log.error(
"%s: \'%s\' Timestamp conversion value error raised!", e)
old_results_ts = ""
# If the timestamp conversion failed - then create a dummy one with
# yesterday's date.
if old_results_ts == "":
log.log(VERBOSE,
"Creating dummy timestamp with yesterday's date")
ts = datetime.datetime.now(
datetime.timezone.utc) - datetime.timedelta(days=1)
old_results_ts = ts.strftime(tf)
old_results_dir = self.results_server_path + "/" + old_results_ts
cmd = (self.results_server_cmd + " mv " + self.results_server_dir +
" " + old_results_dir)
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
if cmd_output.returncode != 0:
log.error("Failed to mv old results page \"%s\" to \"%s\"!",
self.results_server_dir, old_results_dir)
# Do an ls in the results root dir to check what directories exist.
results_dirs = []
cmd = self.results_server_cmd + " ls " + self.results_server_path
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
if cmd_output.returncode == 0:
# Some directories exist. Check if 'latest' is one of them
results_dirs = cmd_output.stdout.decode("utf-8").strip()
results_dirs = results_dirs.split("\n")
else:
log.log(VERBOSE, "Failed to run \"%s\"!", cmd)
# Start pruning
log.log(VERBOSE, "Pruning %s area to limit last 7 results",
self.results_server_path)
rdirs = []
for rdir in results_dirs:
dirname = rdir.replace(self.results_server_path, '')
dirname = dirname.replace('/', '')
# Only track history directories with format
# "year.month.date_hour.min.sec".
if not bool(re.match(r"[\d*.]*_[\d*.]*", dirname)):
continue
rdirs.append(dirname)
rdirs.sort(reverse=True)
rm_cmd = ""
history_txt = "\n## Past Results\n"
history_txt += "- [Latest](../latest/" + self.results_html_name + ")\n"
if len(rdirs) > 0:
for i in range(len(rdirs)):
if i < 7:
rdir_url = '../' + rdirs[i] + "/" + self.results_html_name
history_txt += "- [{}]({})\n".format(rdirs[i], rdir_url)
elif i > 14:
rm_cmd += self.results_server_path + '/' + rdirs[i] + " "
if rm_cmd != "":
rm_cmd = self.results_server_cmd + " -m rm -r " + rm_cmd + "; "
# Append the history to the results.
publish_results_md = self.publish_results_md or self.results_md
publish_results_md = publish_results_md + history_txt
# Publish the results page.
# First, write the results html and json files to the scratch area.
json_str = (json.dumps(self.results_dict)
if hasattr(self, 'results_dict')
else None)
self.write_results("publish.html", publish_results_md, json_str)
results_html_file = self.results_dir / "publish.html"
# Second, copy the files to the server.
log.info("Publishing results to %s", self.results_server_url)
suffixes = ['html'] + (['json'] if json_str is not None else [])
for suffix in suffixes:
src = str(Path(results_html_file).with_suffix('.' + suffix))
dst = self.results_server_page
# results_server_page has '.html' as suffix. If that does not match
# suffix, change it.
if suffix != 'html':
assert dst[-5:] == '.html'
dst = dst[:-5] + '.json'
cmd = f"{self.results_server_cmd} cp {src} {dst}"
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
def publish_results(self):
'''Public facing API for publishing results to the opentitan web
server.
'''
for item in self.cfgs:
item._publish_results()
if self.is_primary_cfg:
self.publish_results_summary()
def publish_results_summary(self):
'''Public facing API for publishing md format results to the opentitan
web server.
'''
# Publish the results page.
log.info("Publishing results summary to %s", self.results_server_url)
cmd = (self.results_server_cmd + " cp " +
str(self.results_page) + " " +
self.results_server_page)
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
def has_errors(self):
return self.errors_seen