blob: 75ff17d8ce8857780b899115cf715b2ebd9c8108 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import datetime
import logging as log
import os
import pprint
from shutil import which
import subprocess
import sys
import hjson
from Deploy import Deploy
from utils import VERBOSE, md_results_to_html, parse_hjson, subst_wildcards
# A set of fields that can be overridden on the command line.
_CMDLINE_FIELDS = {'tool', 'verbosity'}
# Interface class for extensions.
class FlowCfg():
def __str__(self):
return pprint.pformat(self.__dict__)
def __init__(self, flow_cfg_file, proj_root, args):
# Options set from command line
self.items = args.items
self.list_items = args.list
self.select_cfgs = args.select_cfgs
self.flow_cfg_file = flow_cfg_file
self.proj_root = proj_root
self.args = args
self.scratch_root = args.scratch_root
self.branch = args.branch
self.job_prefix = args.job_prefix
# Options set from hjson cfg.
self.project = ""
self.scratch_path = ""
# Imported cfg files using 'import_cfgs' keyword
self.imported_cfg_files = [flow_cfg_file]
# Add exports using 'exports' keyword - these are exported to the child
# process' environment.
self.exports = []
# Add overrides using the overrides keyword - existing attributes
# are overridden with the override values.
self.overrides = []
# List of cfgs if the parsed cfg is a primary cfg list
self.cfgs = []
# Add a notion of "primary" cfg - this is indicated using
# a special key 'use_cfgs' within the hjson cfg.
self.is_primary_cfg = False
# For a primary cfg, it is the aggregated list of all deploy objects under self.cfgs.
# For a non-primary cfg, it is the list of items slated for dispatch.
self.deploy = []
# Timestamp
self.ts_format_long = args.ts_format_long
self.timestamp_long = args.timestamp_long
self.ts_format = args.ts_format
self.timestamp = args.timestamp
# Results
self.errors_seen = False
self.rel_path = ""
self.results_title = ""
self.revision_string = ""
self.results_server_prefix = ""
self.results_server_url_prefix = ""
self.results_server_cmd = ""
self.css_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "style.css")
self.results_server_path = ""
self.results_server_dir = ""
self.results_server_html = ""
self.results_server_page = ""
self.results_summary_server_html = ""
self.results_summary_server_page = ""
# Full results in md text
self.results_md = ""
# Selectively sanitized md results to be mailed out or published
self.email_results_md = ""
self.publish_results_md = ""
self.sanitize_email_results = False
self.sanitize_publish_results = False
# Summary results, generated by over-arching cfg
self.email_summary_md = ""
self.results_summary_md = ""
def __post_init__(self):
# Run some post init checks
if not self.is_primary_cfg:
# Check if self.cfgs is a list of exactly 1 item (self)
if not (len(self.cfgs) == 1 and self.cfgs[0].name == self.name):
log.error("Parse error!\n%s", self.cfgs)
sys.exit(1)
def create_instance(self, flow_cfg_file):
'''Create a new instance of this class for the given config file.
'''
return type(self)(flow_cfg_file, self.proj_root, self.args)
def kill(self):
'''kill running processes and jobs gracefully
'''
for item in self.deploy:
item.kill()
def _parse_cfg(self, path, is_entry_point):
'''Load an hjson config file at path and update self accordingly.
If is_entry_point is true, this is the top-level configuration file, so
it's possible that this is a primary config.
'''
hjson_dict = parse_hjson(path)
# Check if this is the primary cfg, if this is the entry point cfg file
if is_entry_point:
self.is_primary_cfg = self.check_if_primary_cfg(hjson_dict)
# If not a primary cfg, then register self with self.cfgs
if self.is_primary_cfg is False:
self.cfgs.append(self)
# Resolve the raw hjson dict to build this object
self.resolve_hjson_raw(path, hjson_dict)
def _parse_flow_cfg(self, path):
'''Parse the flow's hjson configuration.
This is a private API which should be called by the __init__ method of
each subclass.
'''
self._parse_cfg(path, True)
if self.rel_path == "":
self.rel_path = os.path.dirname(self.flow_cfg_file).replace(
self.proj_root + '/', '')
def check_if_primary_cfg(self, hjson_dict):
# This is a primary cfg only if it has a single key called "use_cfgs"
# which contains a list of actual flow cfgs.
hjson_cfg_dict_keys = hjson_dict.keys()
return ("use_cfgs" in hjson_cfg_dict_keys and type(hjson_dict["use_cfgs"]) is list)
def _set_attribute(self, path, key, dict_val):
'''Set an attribute from an hjson file
The path argument is the path for the hjson file that we're reading.
'''
# Is this value overridden on the command line? If so, use the override
# instead.
args_val = None
if key in _CMDLINE_FIELDS:
args_val = getattr(self.args, key, None)
override_msg = ''
if args_val is not None:
dict_val = args_val
override_msg = ' from command-line override'
self_val = getattr(self, key, None)
if self_val is None:
# A new attribute (or the old value was None, in which case it's
# just a placeholder and needs writing). Set it and return.
setattr(self, key, dict_val)
return
# This is already an attribute. Firstly, we need to make sure the types
# are compatible.
if type(dict_val) != type(self_val):
log.error("Conflicting types for key {!r} when loading {!r}. "
"Cannot override value {!r} with {!r} (which is of "
"type {}{})."
.format(key, path, self_val, dict_val,
type(dict_val).__name__, override_msg))
sys.exit(1)
# Looks like the types are compatible. If they are lists, concatenate
# them.
if isinstance(self_val, list):
setattr(self, key, self_val + dict_val)
return
# Otherwise, check whether this is a type we know how to deal with.
scalar_types = {str: [""], int: [0, -1], bool: [False]}
defaults = scalar_types.get(type(dict_val))
if defaults is None:
log.error("When loading {!r} and setting key {!r}, found a value "
"of {!r}{} with unsupported type {}."
.format(path, key, dict_val,
override_msg, type(dict_val).__name__))
sys.exit(1)
# If the values are equal, there's nothing more to do
if self_val == dict_val:
return
old_is_default = self_val in defaults
new_is_default = dict_val in defaults
# Similarly, if new value looks like a default, ignore it (regardless
# of whether the current value looks like a default).
if new_is_default:
return
# If the existing value looks like a default and the new value doesn't,
# take the new value.
if old_is_default:
setattr(self, key, dict_val)
return
# Neither value looks like a default. Raise an error.
log.error("When loading {!r}, key {!r} is given a value of "
"{!r}{}, but the key is already set to {!r}."
.format(path, key, dict_val, override_msg, self_val))
sys.exit(1)
def resolve_hjson_raw(self, path, hjson_dict):
import_cfgs = []
use_cfgs = []
for key, dict_val in hjson_dict.items():
# If key is 'import_cfgs' then add to the list of cfgs to process
if key == 'import_cfgs':
import_cfgs.extend(dict_val)
continue
# If the key is 'use_cfgs', we're only allowed to take effect for a
# primary config list. If we are in a primary config list, add it.
if key == 'use_cfgs':
if not self.is_primary_cfg:
log.error("Key 'use_cfgs' encountered in the non-primary "
"cfg file list {!r}."
.format(path))
sys.exit(1)
use_cfgs.extend(dict_val)
continue
# Otherwise, set an attribute on self.
self._set_attribute(path, key, dict_val)
# Parse imported cfgs
for cfg_file in import_cfgs:
if cfg_file not in self.imported_cfg_files:
self.imported_cfg_files.append(cfg_file)
# Substitute wildcards in cfg_file files since we need to process
# them right away.
cfg_file = subst_wildcards(cfg_file, self.__dict__)
self._parse_cfg(cfg_file, False)
else:
log.error("Cfg file \"%s\" has already been parsed", cfg_file)
# Parse primary cfg files
if self.is_primary_cfg:
for entry in use_cfgs:
if type(entry) is str:
# Treat this as a file entry
# Substitute wildcards in cfg_file files since we need to process
# them right away.
cfg_file = subst_wildcards(entry,
self.__dict__,
ignore_error=True)
self.cfgs.append(self.create_instance(cfg_file))
elif type(entry) is dict:
# Treat this as a cfg expanded in-line
temp_cfg_file = self._conv_inline_cfg_to_hjson(entry)
if not temp_cfg_file:
continue
self.cfgs.append(self.create_instance(temp_cfg_file))
# Delete the temp_cfg_file once the instance is created
try:
log.log(VERBOSE, "Deleting temp cfg file:\n%s",
temp_cfg_file)
os.system("/bin/rm -rf " + temp_cfg_file)
except IOError:
log.error("Failed to remove temp cfg file:\n%s",
temp_cfg_file)
else:
log.error(
"Type of entry \"%s\" in the \"use_cfgs\" key is invalid: %s",
entry, str(type(entry)))
sys.exit(1)
def _conv_inline_cfg_to_hjson(self, idict):
'''Dump a temp hjson file in the scratch space from input dict.
This method is to be called only by a primary cfg'''
if not self.is_primary_cfg:
log.fatal("This method can only be called by a primary cfg")
sys.exit(1)
name = idict["name"] if "name" in idict.keys() else None
if not name:
log.error("In-line entry in use_cfgs list does not contain "
"a \"name\" key (will be skipped!):\n%s",
idict)
return None
# Check if temp cfg file already exists
temp_cfg_file = (self.scratch_root + "/." + self.branch + "__" +
name + "_cfg.hjson")
# Create the file and dump the dict as hjson
log.log(VERBOSE, "Dumping inline cfg \"%s\" in hjson to:\n%s", name,
temp_cfg_file)
try:
with open(temp_cfg_file, "w") as f:
f.write(hjson.dumps(idict, for_json=True))
except Exception as e:
log.error("Failed to hjson-dump temp cfg file\"%s\" for \"%s\""
"(will be skipped!) due to:\n%s",
temp_cfg_file, name, e)
return None
# Return the temp cfg file created
return temp_cfg_file
def _process_overrides(self):
# Look through the dict and find available overrides.
# If override is available, check if the type of the value for existing
# and the overridden keys are the same.
overrides_dict = {}
if hasattr(self, "overrides"):
overrides = getattr(self, "overrides")
if type(overrides) is not list:
log.error(
"The type of key \"overrides\" is %s - it should be a list",
type(overrides))
sys.exit(1)
# Process override one by one
for item in overrides:
if type(item) is dict and set(item.keys()) == {"name", "value"}:
ov_name = item["name"]
ov_value = item["value"]
if ov_name not in overrides_dict.keys():
overrides_dict[ov_name] = ov_value
self._do_override(ov_name, ov_value)
else:
log.error(
"Override for key \"%s\" already exists!\nOld: %s\nNew: %s",
ov_name, overrides_dict[ov_name], ov_value)
sys.exit(1)
else:
log.error("\"overrides\" is a list of dicts with {\"name\": <name>, "
"\"value\": <value>} pairs. Found this instead:\n%s",
str(item))
sys.exit(1)
def _do_override(self, ov_name, ov_value):
# Go through self attributes and replace with overrides
if hasattr(self, ov_name):
orig_value = getattr(self, ov_name)
if type(orig_value) == type(ov_value):
log.debug("Overriding \"%s\" value \"%s\" with \"%s\"",
ov_name, orig_value, ov_value)
setattr(self, ov_name, ov_value)
else:
log.error("The type of override value \"%s\" for \"%s\" mismatches "
"the type of original value \"%s\"",
ov_value, ov_name, orig_value)
sys.exit(1)
else:
log.error("Override key \"%s\" not found in the cfg!", ov_name)
sys.exit(1)
def _process_exports(self):
# Convert 'exports' to dict
exports_dict = {}
if self.exports != []:
for item in self.exports:
if type(item) is dict:
exports_dict.update(item)
elif type(item) is str:
[key, value] = item.split(':', 1)
if type(key) is not str:
key = str(key)
if type(value) is not str:
value = str(value)
exports_dict.update({key.strip(): value.strip()})
else:
log.error("Type error in \"exports\": %s", str(item))
sys.exit(1)
self.exports = exports_dict
def _purge(self):
'''Purge the existing scratch areas in preperation for the new run.'''
return
def purge(self):
'''Public facing API for _purge().
'''
for item in self.cfgs:
item._purge()
def _print_list(self):
'''Print the list of available items that can be kicked off.
'''
return
def print_list(self):
'''Public facing API for _print_list().
'''
for item in self.cfgs:
item._print_list()
def prune_selected_cfgs(self):
'''Prune the list of configs for a primary config file'''
# This should run after self.cfgs has been set
assert self.cfgs
# If the user didn't pass --select-cfgs, we don't do anything.
if self.select_cfgs is None:
return
# If the user passed --select-cfgs, but this isn't a primary config
# file, we should probably complain.
if not self.is_primary_cfg:
log.error('The configuration file at {!r} is not a primary config, '
'but --select-cfgs was passed on the command line.'
.format(self.flow_cfg_file))
sys.exit(1)
# Filter configurations
self.cfgs = [c for c in self.cfgs if c.name in self.select_cfgs]
def _create_deploy_objects(self):
'''Create deploy objects from items that were passed on for being run.
The deploy objects for build and run are created from the objects that were
created from the create_objects() method.
'''
return
def create_deploy_objects(self):
'''Public facing API for _create_deploy_objects().
'''
self.prune_selected_cfgs()
if self.is_primary_cfg:
self.deploy = []
for item in self.cfgs:
item._create_deploy_objects()
self.deploy.extend(item.deploy)
else:
self._create_deploy_objects()
def deploy_objects(self):
'''Public facing API for deploying all available objects.'''
Deploy.deploy(self.deploy)
def _gen_results(self, fmt="md"):
'''
The function is called after the regression has completed. It collates the
status of all run targets and generates a dict. It parses the testplan and
maps the generated result to the testplan entries to generate a final table
(list). It also prints the full list of failures for debug / triage. The
final result is in markdown format.
'''
return
def gen_results(self):
'''Public facing API for _gen_results().
'''
results = []
for item in self.cfgs:
result = item._gen_results()
log.info("[results]: [%s]:\n%s\n\n", item.name, result)
results.append(result)
self.errors_seen |= item.errors_seen
if self.is_primary_cfg:
self.gen_results_summary()
self.gen_email_html_summary()
def gen_results_summary(self):
'''Public facing API to generate summary results for each IP/cfg file
'''
return
def _get_results_page_link(self, link_text):
if not self.args.publish:
return link_text
results_page_url = self.results_server_page.replace(
self.results_server_prefix, self.results_server_url_prefix)
return "[%s](%s)" % (link_text, results_page_url)
def gen_email_html_summary(self):
if self.is_primary_cfg:
# user can customize email content by using email_summary_md,
# otherwise default to send out results_summary_md
gen_results = self.email_summary_md or self.results_summary_md
else:
gen_results = self.email_results_md or self.results_md
results_html = md_results_to_html(self.results_title, self.css_file, gen_results)
results_html_file = self.scratch_root + "/email.html"
f = open(results_html_file, 'w')
f.write(results_html)
f.close()
log.info("[results summary]: %s [%s]", "generated for email purpose", results_html_file)
def _publish_results(self):
'''Publish results to the opentitan web server.
Results are uploaded to {results_server_path}/latest/results.
If the 'latest' directory exists, then it is renamed to its 'timestamp' directory.
If the list of directories in this area is > 14, then the oldest entry is removed.
Links to the last 7 regression results are appended at the end if the results page.
'''
if which('gsutil') is None or which('gcloud') is None:
log.error(
"Google cloud SDK not installed! Cannot access the results server"
)
return
# Construct the paths
results_page_url = self.results_server_page.replace(
self.results_server_prefix, self.results_server_url_prefix)
# Timeformat for moving the dir
tf = "%Y.%m.%d_%H.%M.%S"
# Extract the timestamp of the existing self.results_server_page
cmd = self.results_server_cmd + " ls -L " + self.results_server_page + \
" | grep \'Creation time:\'"
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
old_results_ts = cmd_output.stdout.decode("utf-8")
old_results_ts = old_results_ts.replace("Creation time:", "")
old_results_ts = old_results_ts.strip()
# Move the 'latest' to its timestamp directory if lookup succeeded
if cmd_output.returncode == 0:
try:
if old_results_ts != "":
ts = datetime.datetime.strptime(
old_results_ts, "%a, %d %b %Y %H:%M:%S %Z")
old_results_ts = ts.strftime(tf)
except ValueError as e:
log.error(
"%s: \'%s\' Timestamp conversion value error raised!", e)
old_results_ts = ""
# If the timestamp conversion failed - then create a dummy one with
# yesterday's date.
if old_results_ts == "":
log.log(VERBOSE,
"Creating dummy timestamp with yesterday's date")
ts = datetime.datetime.now(
datetime.timezone.utc) - datetime.timedelta(days=1)
old_results_ts = ts.strftime(tf)
old_results_dir = self.results_server_path + "/" + old_results_ts
cmd = (self.results_server_cmd + " mv " + self.results_server_dir +
" " + old_results_dir)
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
if cmd_output.returncode != 0:
log.error("Failed to mv old results page \"%s\" to \"%s\"!",
self.results_server_dir, old_results_dir)
# Do an ls in the results root dir to check what directories exist.
results_dirs = []
cmd = self.results_server_cmd + " ls " + self.results_server_path
log.log(VERBOSE, cmd)
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
if cmd_output.returncode == 0:
# Some directories exist. Check if 'latest' is one of them
results_dirs = cmd_output.stdout.decode("utf-8").strip()
results_dirs = results_dirs.split("\n")
else:
log.log(VERBOSE, "Failed to run \"%s\"!", cmd)
# Start pruning
log.log(VERBOSE, "Pruning %s area to limit last 7 results",
self.results_server_path)
rdirs = []
for rdir in results_dirs:
dirname = rdir.replace(self.results_server_path, '')
dirname = dirname.replace('/', '')
if dirname == "latest":
continue
rdirs.append(dirname)
rdirs.sort(reverse=True)
rm_cmd = ""
history_txt = "\n## Past Results\n"
history_txt += "- [Latest](" + results_page_url + ")\n"
if len(rdirs) > 0:
for i in range(len(rdirs)):
if i < 7:
rdir_url = self.results_server_path + '/' + rdirs[
i] + "/" + self.results_server_html
rdir_url = rdir_url.replace(self.results_server_prefix,
self.results_server_url_prefix)
history_txt += "- [{}]({})\n".format(rdirs[i], rdir_url)
elif i > 14:
rm_cmd += self.results_server_path + '/' + rdirs[i] + " "
if rm_cmd != "":
rm_cmd = self.results_server_cmd + " -m rm -r " + rm_cmd + "; "
# Append the history to the results.
publish_results_md = self.publish_results_md or self.results_md
publish_results_md = publish_results_md + history_txt
# Publish the results page.
# First, write the results html file temporarily to the scratch area.
results_html_file = self.scratch_path + "/results_" + self.timestamp + ".html"
f = open(results_html_file, 'w')
f.write(
md_results_to_html(self.results_title, self.css_file, publish_results_md))
f.close()
rm_cmd += "/bin/rm -rf " + results_html_file + "; "
log.info("Publishing results to %s", results_page_url)
cmd = (self.results_server_cmd + " cp " + results_html_file + " " +
self.results_server_page + "; " + rm_cmd)
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
def publish_results(self):
'''Public facing API for publishing results to the opentitan web server.
'''
for item in self.cfgs:
item._publish_results()
if self.is_primary_cfg:
self.publish_results_summary()
def publish_results_summary(self):
'''Public facing API for publishing md format results to the opentitan web server.
'''
results_html_file = "summary_" + self.timestamp + ".html"
results_page_url = self.results_summary_server_page.replace(
self.results_server_prefix, self.results_server_url_prefix)
# Publish the results page.
# First, write the results html file temporarily to the scratch area.
f = open(results_html_file, 'w')
f.write(
md_results_to_html(self.results_title, self.css_file, self.results_summary_md))
f.close()
rm_cmd = "/bin/rm -rf " + results_html_file + "; "
log.info("Publishing results summary to %s", results_page_url)
cmd = (self.results_server_cmd + " cp " + results_html_file + " " +
self.results_summary_server_page + "; " + rm_cmd)
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
def has_errors(self):
return self.errors_seen